3

In python, what's the idiomatic way to establish a one-way communication between two threading.Threads, call them thread a and thread b.

a is the producer, it continuously generates values for b to consume.

b is the consumer, it reads one value generated by a, process the value with a coroutine, and then reads the next value, and so on.

Illustration:

q = very_magic_queue.Queue()


def worker_of_a(q):
    while True:
        q.put(1)
        time.sleep(1)

a = threading.Thread(worker_of_a, args=(q,))
a.start()


async def loop(q):
    while True:
        # v must be processed in the same order as they are produced
        v = await q.get()
        print(v)

async def foo():
    pass

async def b_main(q):
    loop_fut = asyncio.ensure_future(loop(q))
    foo_fut = asyncio.ensure_future(foo())
    _ = await asyncio.wait([loop_fut, foo_fut], ...)
    # blah blah blah

def worker_of_b(q):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(b_main(q))

b = threading.Thread(worker_of_b, args=(q,))
b.start()

Of course the above code doesn't work, because queue.Queue.get cannot be awaitted, and asyncio.Queue cannot be used in another thread.

I also need a communication channel from b to a.

I would be great if the solution could also work with gevent.

Thanks :)

Incömplete
  • 655
  • 6
  • 12
  • A couple of related questions I asked some time ago: [How can I synchronize asyncio with other OS threads?](https://stackoverflow.com/q/53158101), [How can you wait for completion of a callback submitted from another thread?](https://stackoverflow.com/q/53107032). – jdehesa Apr 24 '19 at 12:19

3 Answers3

2

I had a similar problem -communicate data between a thread and asyncio. The solution I used is to create a sync Queue and add methods for async get and async put using asyncio.sleep to make it non-blocking. Here is my queue class:


#class to provide queue (sync or asyc morph)
class queMorph(queue.Queue):
    def __init__(self,qSize,qNM):
        super().__init__(qSize)
        self.timeout=0.018
        self.me=f'queMorph-{qNM}'
    #Introduce methods for async awaitables morph of Q
    async def aget(self):
        while True:
            try:
                return self.get_nowait()
            except queue.Empty:
                await asyncio.sleep(self.timeout)
            except Exception as E:
                raise
    async def aput(self,data):
        while True:
            try:
                return self.put_nowait(data)
            except queue.Full:
                print(f'{self.me} Queue full on put..')
                await asyncio.sleep(self.timeout)
            except Exception as E:
                raise

To put/get items from queue from the thread (synchronous), use the normal q.get() and q.put() blocking functions. In the async loop, use q.aget() and q.aput() which do not block.

giwyni
  • 2,265
  • 1
  • 17
  • 11
1

You can use a synchronized queue from the queue module and defer the wait to a ThreadPoolExecutor:

async def loop(q):
    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=1) as executor:
        loop = asyncio.get_event_loop()
        while True:
            # v must be processed in the same order as they are produced
            v = await loop.run_in_executor(executor, q.get)
            print(v)
jdehesa
  • 55,673
  • 6
  • 74
  • 107
1

I've used Janus to solve this problem - it's a Python library that gives you a thread-safe queue that can be used to communicate between asyncio and a thread.

def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_code(async_q):
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


queue = janus.Queue()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_code(queue.async_q)
Simon Willison
  • 15,062
  • 5
  • 33
  • 43