Git Product home page Git Product logo

janus's Introduction

janus

Chat on Gitter

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous (in terms of asyncio) one.

Like Janus god the queue object from the library has two faces: synchronous and asynchronous interface.

Synchronous is fully compatible with standard queue, asynchronous one follows asyncio queue design.

Usage example (Python 3.7+)

import asyncio
import janus


def threaded(sync_q: janus.SyncQueue[int]) -> None:
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q: janus.AsyncQueue[int]) -> None:
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main() -> None:
    queue: janus.Queue[int] = janus.Queue()
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()


asyncio.run(main())

Usage example (Python 3.5 and 3.6)

N.B. For python 3.6 and below you must use janus < 1.0.0

import asyncio
import janus

loop = asyncio.get_event_loop()


def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q):
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main():
    queue = janus.Queue()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()

try:
    loop.run_until_complete(main())
finally:
    loop.close()

Communication channels

GitHub Discussions: https://github.com/aio-libs/janus/discussions

Feel free to post your questions and ideas here.

gitter chat https://gitter.im/aio-libs/Lobby

License

janus library is offered under Apache 2 license.

Thanks

The library development is sponsored by DataRobot (https://datarobot.com)

janus's People

Contributors

achimnol avatar alefteris avatar apatrushev avatar asvetlov avatar bmwant avatar dependabot-preview[bot] avatar dependabot-support avatar dependabot[bot] avatar hauntsaninja avatar jettify avatar kxepal avatar linw1995 avatar pyup-bot avatar rbuffat avatar richardbaronpenman avatar simonw avatar telendt avatar vlanse avatar willstott101 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

janus's Issues

Unexpected stdout output

I run this

#!/usr/bin/env python3
import time
from threading import Thread
from itertools import count
import asyncio
import janus


class Timer():
    def __init__(self):
        self._start = None

    def start(self):
        self._start = time.time()

    def stop(self):
        return time.time() - self._start


def consumer_thread(taskq, result):
    while True:
        task = taskq.get()
        if task is None:
            return
        result[0] = task


async def producer_thread(taskq):
    for x in range(10000 + 1):
        await taskq.put(x)
    await taskq.put(None)


def bench_sync_async():
    taskq = janus.Queue(maxsize=100)

    loop = asyncio.get_event_loop()

    result = [None]
    consumer = Thread(target=consumer_thread, args=[taskq.sync_q, result])
    consumer.start()

    loop.run_until_complete(producer_thread(taskq.async_q))
    consumer.join()

    assert result[0] == 10000


def main(**kwargs):
    timer = Timer()
    timer.start()
    bench_sync_async()
    elapsed = timer.stop()
    print('Sync + async: %.2f sec.' % elapsed)


if __name__ == '__main__':
    main()

and get

$ ./bench_queue.py 
Sync + async: 1.07 sec.
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue._notify_async_not_full.<locals>.f() running at /usr/local/lib/python3.5/dist-packages/janus/__init__.py:151> cb=[set.discard()]>
sys:1: RuntimeWarning: coroutine 'Queue._notify_async_not_full.<locals>.f' was never awaited

Do I something wrong? I do not want to see all these messages.

No running event loop

I am having trouble creating the janus Queue outside of asyncio. It requires me to make dummy background tasks to just create the object, since not running loop exists I am wonder if this could be lazy or allow the loop to be passed in the constructor? It makes sync to async code a tad harder.

Does Janus support multiprocessing.Queue?

I make heavy use of Janus Queue in one of my projects to asynchronize thread communication. Now I would like to do the same with communication with subprocesses. Does Janus Queue provide the same level of compatibility for multiprocessing.Queue as it does for queue.Queue?

Cheers

`RuntimeError: no running event loop` after upgrading to 0.5.0

Unfortunately, changes in #246 broke the code that relies on creating an instance of janus.Queue outside of the scope of a running event loop, for no obvious reason.

Also, the behavior now contradicts with the behavior of asyncio.Queue.

>>> import asyncio
>>> asyncio.Queue()
<Queue at 0x7f76be085730 maxsize=0>
>>> import janus
>>> janus.Queue()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/user/projects/project/.venv/lib64/python3.8/site-packages/janus/__init__.py", line 29, in __init__
    self._loop = current_loop()
RuntimeError: no running event loop

Is there any good reason to not just use asyncio.get_event_loop? From performance perspective it shouldn't be significant.

Issue Passing Element Into Queue Right Before Co-Routine Completes

I have two coroutines running. When one of the co-routines is ready to finish, it puts an element into the Janus queue, which is queried by the second co-routine indicating that it's time to terminate.

I've noticed that if I put an element into the queue right before the first co-routine terminates, the element is never received in the second co-routine. If I put the element in sooner, it is received properly.

I'm wondering, is this possibly a bug, or am I missing something obvious?

# outbound_queue is a global variable

        async def handle_input_from_raven_desktop():

		try:
			async for msg in ws:
				logger.info('Message received.')	
		except asyncio.CancelledError:
			logger.info('Websocket cancelled error.')
		finally:
			logger.info('Tearing down websocket connection.')
			await ws.close()
			await outbound_queue.async_q.put(None)

	async def handle_output_to_raven_desktop():
	
		while True:
			command = await outbound_queue.async_q.get()
			if command is None:
				logger.info('Terminating handler for transmitting output commands.')
				break
			

Support an atomic `.clear()` operation.

Hi - I have the following use case:

We have an in-memory PubSub class for an SSE streaming web server. Our streams either send patches or full updates to the web client.

To ensure all messages are recieved (important for patch-type messages) we use a small queue per client. However, if this fills up, we can only drop messages. What we do is drop all items in the queue and send a full update.

However, there is no way to atomically clear the queue. Currently we have to use a lock around calling .get_nowait() in a loop. This is not ideal as janus already has a lock internally.

Would it be possible to add an atomic .clear() operation? I understand this is not present in the other implementations that janus matches, so if API similarity is a concern then I understand.

Thanks!

Performance benefits?

So I've been testing the performance a bit, varying e.g.

  • 1:1, 16:1, 64:1 producer : consumer ratio
  • normal/lifo/priority queue etc
  • asyncio/uvloop event loop
  • async -> async, async -> sync, sync -> sync
  • (probably more)

I found that, janus queues are ~5x slower in sync->sync, ~9x slower in sync->async, and ~15x slower in async->async. This is pretty much consistent across all parameter sets.

This confirmed my suspicion that the performance gain of parallel computation is often less than the cost of using e.g. threading.Lock a lot (the GIL certainly doesn't help either).

Right now, I can imagine that many users have incorrect expectations of janus. To avoid this, you could add an example that shows how janus can outperform single-threaded asyncio, by employing multiple threads. Additionally, a caveat about janus' performance would be helpful.

Error with Python 3.10: "ValueError: loop argument must agree with lock"

I ran into this in my own project, see simonw/datasette#1481 - then I tried using a fork of this project to run the unit tests against Python 3.10 and got the same error: https://github.com/simonw/janus/runs/3842463703?check_suite_focus=true

 ============================= test session starts ==============================
 platform linux -- Python 3.10.0, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
 rootdir: /home/runner/work/janus/janus
 plugins: cov-2.12.1, asyncio-0.15.1
 collected 72 items
 
 tests/test_async.py FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF                      [ 43%]
 tests/test_mixed.py .FFFFFFFFFFFFFFFFF                                   [ 68%]
 tests/test_sync.py FFFFFFFFFFFFFFFFFFFFFFF                               [100%]
 
 =================================== FAILURES ===================================
 __________________________ TestQueueBasic.test_empty ___________________________
 
 self = <test_async.TestQueueBasic object at 0x7fb0f561e4d0>
 
     @pytest.mark.asyncio
     async def test_empty(self):
 >       _q = janus.Queue()
 
 tests/test_async.py:65: 
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 janus/__init__.py:39: in __init__
     self._async_not_empty = asyncio.Condition(self._async_mutex)
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 
 self = <[AttributeError("'Condition' object has no attribute 'locked'") raised in repr()] Condition object at 0x7fb0f569a620>
 lock = <asyncio.locks.Lock object at 0x7fb0f569ada0 [unlocked]>
 
     def __init__(self, lock=None, *, loop=mixins._marker):
         super().__init__(loop=loop)
         if lock is None:
             lock = Lock()
         elif lock._loop is not self._get_loop():
 >           raise ValueError("loop argument must agree with lock")
 E           ValueError: loop argument must agree with lock

formatting tests fail

============================= test session starts ==============================
platform linux -- Python 3.7.7, pytest-5.4.2, py-1.8.1, pluggy-0.13.1
rootdir: /build/janus-0.5.0
plugins: asyncio-0.12.0
collected 71 items                                                             

tests/test_async.py ......................F...F...F...                   [ 47%]
tests/test_mixed.py ..............                                       [ 67%]
tests/test_sync.py .......................                               [100%]

=================================== FAILURES ===================================
__________________________ QueueJoinTests.test_format __________________________
Unexpected success
________________________ LifoQueueJoinTests.test_format ________________________
Unexpected success
______________________ PriorityQueueJoinTests.test_format ______________________
Unexpected success
=============================== warnings summary ===============================
tests/test_async.py::QueueBasicTests::test_empty
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueBasicTests.test_empty' was never awaited
    testMethod()

tests/test_async.py::QueueBasicTests::test_full
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueBasicTests.test_full' was never awaited
    testMethod()

tests/test_async.py::QueueBasicTests::test_maxsize
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueBasicTests.test_maxsize' was never awaited
    testMethod()

tests/test_async.py::QueueBasicTests::test_order
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueBasicTests.test_order' was never awaited
    testMethod()

tests/test_async.py::QueueGetTests::test_blocking_get
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueGetTests.test_blocking_get' was never awaited
    testMethod()

tests/test_async.py::QueueGetTests::test_blocking_get_wait
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueGetTests.test_blocking_get_wait' was never awaited
    testMethod()

tests/test_async.py::QueueGetTests::test_get_cancelled
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueGetTests.test_get_cancelled' was never awaited
    testMethod()

tests/test_async.py::QueueGetTests::test_get_cancelled_race
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueGetTests.test_get_cancelled_race' was never awaited
    testMethod()

tests/test_async.py::QueueGetTests::test_get_with_putters
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueGetTests.test_get_with_putters' was never awaited
    testMethod()

tests/test_async.py::QueueGetTests::test_get_with_waiting_putters
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueGetTests.test_get_with_waiting_putters' was never awaited
    testMethod()

tests/test_async.py::QueueGetTests::test_nonblocking_get
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueGetTests.test_nonblocking_get' was never awaited
    testMethod()

tests/test_async.py::QueueGetTests::test_nonblocking_get_exception
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueueGetTests.test_nonblocking_get_exception' was never awaited
    testMethod()

tests/test_async.py::QueuePutTests::test_blocking_put
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueuePutTests.test_blocking_put' was never awaited
    testMethod()

tests/test_async.py::QueuePutTests::test_blocking_put_wait
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueuePutTests.test_blocking_put_wait' was never awaited
    testMethod()

tests/test_async.py::QueuePutTests::test_float_maxsize
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueuePutTests.test_float_maxsize' was never awaited
    testMethod()

tests/test_async.py::QueuePutTests::test_nonblocking_put
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueuePutTests.test_nonblocking_put' was never awaited
    testMethod()

tests/test_async.py::QueuePutTests::test_nonblocking_put_exception
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueuePutTests.test_nonblocking_put_exception' was never awaited
    testMethod()

tests/test_async.py::QueuePutTests::test_put_cancelled
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueuePutTests.test_put_cancelled' was never awaited
    testMethod()

tests/test_async.py::QueuePutTests::test_put_cancelled_race
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueuePutTests.test_put_cancelled_race' was never awaited
    testMethod()

tests/test_async.py::QueuePutTests::test_put_with_waiting_getters
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'QueuePutTests.test_put_with_waiting_getters' was never awaited
    testMethod()

tests/test_async.py::LifoQueueTests::test_order
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'LifoQueueTests.test_order' was never awaited
    testMethod()

tests/test_async.py::PriorityQueueTests::test_order
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'PriorityQueueTests.test_order' was never awaited
    testMethod()

tests/test_async.py::QueueJoinTests::test_format
tests/test_async.py::LifoQueueJoinTests::test_format
tests/test_async.py::PriorityQueueJoinTests::test_format
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine '_QueueJoinTestMixin.test_format' was never awaited
    testMethod()

tests/test_async.py::QueueJoinTests::test_join_empty_queue
tests/test_async.py::LifoQueueJoinTests::test_join_empty_queue
tests/test_async.py::PriorityQueueJoinTests::test_join_empty_queue
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine '_QueueJoinTestMixin.test_join_empty_queue' was never awaited
    testMethod()

tests/test_async.py::QueueJoinTests::test_task_done
tests/test_async.py::LifoQueueJoinTests::test_task_done
tests/test_async.py::PriorityQueueJoinTests::test_task_done
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine '_QueueJoinTestMixin.test_task_done' was never awaited
    testMethod()

tests/test_async.py::QueueJoinTests::test_task_done_underflow
tests/test_async.py::LifoQueueJoinTests::test_task_done_underflow
tests/test_async.py::PriorityQueueJoinTests::test_task_done_underflow
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine '_QueueJoinTestMixin.test_task_done_underflow' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_async_join_async_done
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_async_join_async_done' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_async_put_sync_get
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_async_put_sync_get' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_closed
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_closed' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_double_closing
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_double_closing' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_maxsize
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_maxsize' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_maxsize_default
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_maxsize_default' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_maxsize_named_param
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_maxsize_named_param' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_modifying_forbidden_after_closing
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_modifying_forbidden_after_closing' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_sync_join_async_done
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_sync_join_async_done' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_sync_put_async_get
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_sync_put_async_get' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_sync_put_async_join
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_sync_put_async_join' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_unfinished
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_unfinished' was never awaited
    testMethod()

tests/test_mixed.py::TestMixedMode::test_wait_without_closing
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'TestMixedMode.test_wait_without_closing' was never awaited
    testMethod()

tests/test_sync.py::QueueTest::test_maxsize
tests/test_sync.py::LifoQueueTest::test_maxsize
tests/test_sync.py::PriorityQueueTest::test_maxsize
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'BaseQueueTestMixin.test_maxsize' was never awaited
    testMethod()

tests/test_sync.py::QueueTest::test_negative_timeout_raises_exception
tests/test_sync.py::LifoQueueTest::test_negative_timeout_raises_exception
tests/test_sync.py::PriorityQueueTest::test_negative_timeout_raises_exception
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'BaseQueueTestMixin.test_negative_timeout_raises_exception' was never awaited
    testMethod()

tests/test_sync.py::QueueTest::test_nowait
tests/test_sync.py::LifoQueueTest::test_nowait
tests/test_sync.py::PriorityQueueTest::test_nowait
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'BaseQueueTestMixin.test_nowait' was never awaited
    testMethod()

tests/test_sync.py::QueueTest::test_queue_join
tests/test_sync.py::LifoQueueTest::test_queue_join
tests/test_sync.py::PriorityQueueTest::test_queue_join
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'BaseQueueTestMixin.test_queue_join' was never awaited
    testMethod()

tests/test_sync.py::QueueTest::test_queue_task_done
tests/test_sync.py::LifoQueueTest::test_queue_task_done
tests/test_sync.py::PriorityQueueTest::test_queue_task_done
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'BaseQueueTestMixin.test_queue_task_done' was never awaited
    testMethod()

tests/test_sync.py::QueueTest::test_shrinking_queue
tests/test_sync.py::LifoQueueTest::test_shrinking_queue
tests/test_sync.py::PriorityQueueTest::test_shrinking_queue
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'BaseQueueTestMixin.test_shrinking_queue' was never awaited
    testMethod()

tests/test_sync.py::QueueTest::test_simple_queue
tests/test_sync.py::LifoQueueTest::test_simple_queue
tests/test_sync.py::PriorityQueueTest::test_simple_queue
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'BaseQueueTestMixin.test_simple_queue' was never awaited
    testMethod()

tests/test_sync.py::FailingQueueTest::test_closed_loop_non_failing
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'FailingQueueTest.test_closed_loop_non_failing' was never awaited
    testMethod()

tests/test_sync.py::FailingQueueTest::test_failing_queue
  /nix/store/q732h09azy7lf0j30bnnhdl15p4rxpdy-python3-3.7.7/lib/python3.7/unittest/case.py:628: RuntimeWarning: coroutine 'FailingQueueTest.test_failing_queue' was never awaited
    testMethod()

-- Docs: https://docs.pytest.org/en/latest/warnings.html
=========================== short test summary info ============================
FAILED tests/test_async.py::QueueJoinTests::test_format
FAILED tests/test_async.py::LifoQueueJoinTests::test_format
FAILED tests/test_async.py::PriorityQueueJoinTests::test_format
================== 3 failed, 68 passed, 70 warnings in 0.86s ===================
builder for '/nix/store/amxgdvbpnkqz9fj8120axrz0cmr4pzh3-python3.7-janus-0.5.0.drv' failed with exit code 1

PriorityQueue; TypeError: '<' not supported between instances of 'dict' and 'dict'

I'm trying to use the PriorityQueue in my program.

I'm trying to store objects in the queue like this:

p = janus.PriorityQueue(loop=asyncio.get_event_loop(), maxsize=5)

p.sync_q.put((0, {"name": "test1"}))
p.sync_q.put((0, {"name": "test2"}))

And it results in

Traceback (most recent call last):
  File "/home/user/anaconda3/envs/hybrit_py3_2/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3326, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-41389a2f4052>", line 1, in <module>
    p.sync_q.put((0, {"name": "test2"}))
  File "/home/user/anaconda3/envs/hybrit_py3_2/lib/python3.7/site-packages/janus/__init__.py", line 287, in put
    self._parent._put_internal(item)
  File "/home/user/anaconda3/envs/hybrit_py3_2/lib/python3.7/site-packages/janus/__init__.py", line 115, in _put_internal
    self._put(item)
  File "/home/user/anaconda3/envs/hybrit_py3_2/lib/python3.7/site-packages/janus/__init__.py", line 531, in _put
    heappush(self._queue, item)
TypeError: '<' not supported between instances of 'dict' and 'dict'

It seems like heapq wants to compare the items in the case that the priority value is the same, and fails because there's no such thing as dict < dict.

How do I store complex data types in a your PriorityQueue?

Double-ended Queue Support

Hey!
Recently I found your library and it's really helping me with the communication between sub thread coroutines and main thread functions.
One of my use cases needs the ability to clear all the already exiting messages in a queue,
or to insert the new message to the end of the queue.
That ability is not possible when using a normal queue,
so I wanted to know if there is any planning for a double-ended queue support any time soon.
Thanks!

Could async_q.get() support a timeout?

Python's queue.Queue.get() takes an optional timeout, and will block for up to that amount of time.

Python's asyncio.Queue.get() doesn't support this option.

I know Janus is attempting to closely initiate these APIs, but would it be feasible (and a good idea) for Janus to provide a timeout option even when reading from the async queue?

I would love to be able to do this:

try:
    item = await queue.async_q.get(timeout=0.5)
except Empty:
    # ...

the perfomance is very bad

import janus
import asyncio
import time
import threading
import nb_log
queue = janus.Queue(maxsize=6000)

async def consume():
    while 1:
        #await  asyncio.sleep(1)
        val = await queue.async_q.get()
        print(val)

def push():
    for i in range(50000):
        # time.sleep(0.2)
        # print(i)
        queue.sync_q.put(i)


if __name__ == '__main__':
    threading.Thread(target=push).start()
    loop = asyncio.get_event_loop()
    loop.create_task(consume())
    loop.run_forever()

the queue can only run 3000 times per second,conusme function is very simple it has no io and compute。
When i use queue.Queue and multithreding,can run more than 10000 times per second。could you prove the project performance。

Syntax error with Python 3.7

Hi there, while testing with the newly released Python 3.7 importing Janus produces the following exception:

  File "/home/martino/github/pystm/pystm/log/execution.py", line 2, in <module>
    import janus
  File "/home/martino/github/pystm/venv2/lib/python3.7/site-packages/janus/__init__.py", line 19
    ensure_future = asyncio.async
                                ^
SyntaxError: invalid syntax

Python 3.7 made async a reserved keyword, thus it cannot be used

Separate head and pop (allow peeking)

Hello,
I am using janus, very happily, to mix async and threaded code, and it works great.

I have just a little suggestion about the API: I think you should provide a head() or peek() method, that allows to get the first element in the queue without removing it. Although this is not really necessary (also, you provide a PriorityQueue), I think it's a great addition which could make user code easier.

The current get() method could be implemented using head() + removal, so code should stay reasonably similar to the current.

This would allow to take decisions earlier, without messing with queue order.

join() calls should raise RuntimeError while the queue is closing or closed.

Recently discover that it can call q.join() on a closed queue. And I notice the q.join() will block forever even after the queue being closed.

It is better that it should check parent queue is closing or not, while q.join() calling or to be called.

The q.join() calls should raise RuntimeError, when q.join() calls on a closed queue or when closing a queue while q.join() is calling.

import janus


async def join_after_closing():
    q = jauns.Queue()
    await q.async_q.put("boo")
    q.close()
    await q.async_q.join()  # blocks forever


async def close_after_join():
    q = jauns.Queue()
    await q.async_q.put("boo")
    task = janus.ensure_future(q.async_q.join())
    await asyncio.sleep(1) # ensure the task is blocking
    q.close()
    await task  # blocks forever too

One out of multiple thread consumers hogs queue if consumers count is > 10

Hi and first of all thanks for the awesome Janus library!

I am trying to implement what seems to be a standard single producer -- multiple consumers pattern. I work with aiohttp so my producer is async but a library I need to use for consuming is sync so I run consumers in a thread executor. The minimal example is:

import asyncio
from time import sleep
from janus import Queue


async def test_concurrency():
    # Database init from scratch

    iterations = 10

    # Feed and write
    async def produce_data(q):
        nonlocal iterations
        print('starting producing')
        for raw_data in range(iterations):
            await q.put(raw_data)
        print('finishing feeding data')
        return None

    def consume_data(q, i):
        print(f"{i} init")
        print(f"{i} start")
        try:
            for datapoint in iter(q.get, None):
                print(f"{i} cycle start")
                # Do some work
                sleep(1)
                q.task_done()
                print(f"{i} consumed")
                print(f"{i} cycle end")
        except Exception as e:
            print('error is:')
            print(e)
            print('consumer exiting on error')
            raise
        print(f"{i} producer exit")

    loop = asyncio.get_running_loop()
    q = Queue(30)
    producer = asyncio.create_task(produce_data(q.async_q))
    consumers = [loop.run_in_executor(None, consume_data, q.sync_q, x) for x in range(30)]

    await asyncio.wait({producer})
    print('---- done producing')
    for _ in consumers:
        await q.async_q.put(None)
    await asyncio.wait({*consumers})
    for c in consumers:
        print('canceling')
        c.cancel()

    print('---- done consuming')


def main():
    asyncio.get_event_loop().run_until_complete(test_concurrency())


if __name__ == '__main__':
    main()

As long as the range(..) parameter in the consumers creation stays below 11 everything works as expected:

0 init
0 start
1 init
...
starting producing
8 init
8 start
finishing feeding data
---- done producing
8 cycle start
0 cycle start
...
1 consumed
1 cycle end
1 cycle start
2 consumed
2 cycle end
...
canceling
canceling
...
---- done consuming

As soon as I use more than 10 consumers (like 30 in the example code), there are no errors thrown but the execution seems to persistenly happen exclusively in the i=9 consumer:

0 start
1 init
1 start
...
starting producing
finishing feeding data
---- done producing
9 init
9 start
9 cycle start
9 consumed
9 cycle end
9 cycle start
9 consumed
9 cycle end
...
29 producer exit
...
2 producer exit
canceling
...
---- done consuming

Is there something wrong with my use of Janus?

Thanks in advance.

python 3.8 support without warnings

Hi, I'm using janus 0.4.0 (from pypi) in python 3.8 and it generates this warning:

venv\lib\site-packages\janus\__init__.py:39: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.

It's related to these lines:

janus/janus/__init__.py

Lines 40 to 45 in 32d16df

self._async_mutex = asyncio.Lock(loop=self._loop)
self._async_not_empty = asyncio.Condition(
self._async_mutex, loop=self._loop)
self._async_not_full = asyncio.Condition(
self._async_mutex, loop=self._loop)
self._finished = asyncio.Event(loop=self._loop)

Task was destroyed but it is pending

I get flooded with these when using janus

2023-10-25 17:32:45,185 - asyncio - ERROR - Task was destroyed but it is pending!
task: <Task pending name='Task-8498' coro=<Queue._notify_async_not_empty..f() running at /home/ubuntu/ai-worker/venv/lib/python3.11/site-packages/janus/init.py:238> cb=[set.discard()]>

async_q.get() can block the event loop under high load

When adding a lot of things to the queue async get will block for some time. Adding a sleep(0.001) helps, but will decrease throughput substantially. Especially with multiple threads adding to the queue.

import asyncio
import time
import janus


async def get(q):
    while True:
        await q.get()


def add(q):
    while True:
        time.sleep(0.5)
        for i in range(0, 500):
            q.put(i)


loop = asyncio.get_event_loop()
queue = janus.Queue(maxsize=100, loop=loop)
loop.set_debug(True)
loop.run_in_executor(None, add, queue.sync_q)
loop.run_until_complete(get(queue.async_q))
Executing <Handle <TaskWakeupMethWrapper object at 0x7f9f377b3078>(<Future finis...events.py:275>) created at /usr/lib64/python3.6/asyncio/locks.py:383> took 0.250 seconds
Executing <Handle <TaskWakeupMethWrapper object at 0x7f9f343d0498>(<Future finis...events.py:275>) created at /usr/lib64/python3.6/asyncio/locks.py:383> took 0.364 seconds
Executing <Handle <TaskWakeupMethWrapper object at 0x7f9f3454ef78>(<Future finis...events.py:275>) created at /usr/lib64/python3.6/asyncio/locks.py:383> took 0.341 seconds

Queue closing does not affect sync .put() calls in waiting state

Hi! I found some potentially unexpected behaviour of queue closing. If thread producer blocks on attempt to sync put to queue and we close queue in another control thread, thread producer will wait forever. I suppose that expected behaviour should be a RuntimeError in sync put() method on queue closing. What do you think about it?

Here is a code to reproduce this situation:

import asyncio
import logging
from concurrent.futures.thread import ThreadPoolExecutor
from queue import Queue

import janus

logging.basicConfig(format='%(threadName)-12s: %(message)s', level=logging.DEBUG)


async def main(tpe):
    hybrid_q = janus.Queue(maxsize=1)

    def some_long_job(q: Queue):
        logging.info("Job is running")
        for i in range(int(1e6)):
            try:
                logging.info("Putting to q: %s", i)
                q.put(f"item_{i}")
                logging.info("Putting to q done: %s", i)
            except Exception as ex:
                logging.exception(ex)
                raise

    job = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor(tpe, some_long_job, hybrid_q.sync_q))
    await asyncio.sleep(0.5)

    job.cancel()
    logging.info("Closing queue")
    hybrid_q.close()

    logging.info("Waiting q to be closed")
    await hybrid_q.wait_closed()
    logging.info("Queue was closed")


if __name__ == '__main__':
    tpe = ThreadPoolExecutor(thread_name_prefix="TPE_")
    asyncio.run(main(tpe))
    logging.info("Shutting down TPE")
    tpe.shutdown(wait=True)
    logging.info("TPE was shut down")

using deque

Hi

I see that janus using collections.deque but the maxlen is not use ( maxsize not pass to maxlen )

def _init(self, maxsize: int) -> None:
    self._queue = deque()  # type: Deque[T]

Is there any reason for this. Does deque not thread safe ?

I've use case that it use one write , many read approach.
So when it write to the queue the function does not need to check if it is full ( maxsize = 1 ) and overwrite directly

If the reader using get / get_nowait the other reader cannot access it.
Currently I access manually using

q.async_q._parent._queue[0]

each reader will have their own check and exception handling

Use AnyIO (optionally)

Currently ThreadPoolExecutor is in use. It would be nice to have an option to use AnyIO.

Why:

  • If a project is using AnyIO, janus uses ThreadPoolExecutor and there are a lot of worker threads for janus only hanging around.

update asyncio.async() to use asyncio.ensure_future()

  File "/usr/lib/python3.5/site-packages/janus-0.2.1-py3.5.egg/janus/__init__.py", line 147, in task_maker
    task = asyncio.async(f(), loop=self._loop)
  File "/usr/lib/python3.5/asyncio/tasks.py", line 530, in async
    DeprecationWarning)
  File "/var/bluelabs/python/wampcams.py", line 24, in warn_with_traceback
    traceback.print_stack()
/usr/lib/python3.5/asyncio/tasks.py:530: DeprecationWarning: asyncio.async() function is deprecated, use ensure_future()
  DeprecationWarning)

Janus queues should be creatable in synchronous code as well as in asynchronous code

Hi,
The janus.Queue constructor makes a call to get_event_loop. This will of course fail in contexts where an event loop is not active yet. Given that janus.Queue is designed to bridge asynchronous and synchronous contexts, this seems to be a a severe limitation.

Would you consider deferring the call to get_event_loop until Queue.async_q is called or provide some other expedient. Thanks.

What is the correct way to type annotate a janus.Queue?

When I try to add type hints to the example, pylance creates reportPrivateUsage warnings.

import asyncio
import janus


def threaded(sync_q: janus._SyncQueueProxy[int]):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q: janus._AsyncQueueProxy[int]):
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main():
    queue: janus.Queue[int] = janus.Queue()
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()


asyncio.run(main())

Warnings:

"_SyncQueueProxy" is private and used outside of the module in which it is declared

"_AsyncQueueProxy" is private and used outside of the module in which it is declared

What is the preferred way to add type hints to janus Queues?

Exception Handling Question

Hi guys, I'm testing janus queue to communicate an async process with a sync one, the problem is that at some point the sync process raises exceptions. how to handled them?

import asyncio
import janus


def threaded(sync_q):
    for i in range(100):
        raise Exception('Some custom Exception')
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q):
    for i in range(100):
        #should it be handeled here ? 
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main():
    queue = janus.Queue()
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    # try except block should be here ?
    await fut
    queue.close()
    await queue.wait_closed()


asyncio.run(main())

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.