82

I am trying to properly understand and implement two concurrently running Task objects using Python 3's relatively new asyncio module.

In a nutshell, asyncio seems designed to handle asynchronous processes and concurrent Task execution over an event loop. It promotes the use of await (applied in async functions) as a callback-free way to wait for and use a result, without blocking the event loop. (Futures and callbacks are still a viable alternative.)

It also provides the asyncio.Task() class, a specialized subclass of Future designed to wrap coroutines. Preferably invoked by using the asyncio.ensure_future() method. The intended use of asyncio tasks is to allow independently running tasks to run 'concurrently' with other tasks within the same event loop. My understanding is that Tasks are connected to the event loop which then automatically keeps driving the coroutine between await statements.

I like the idea of being able to use concurrent Tasks without needing to use one of the Executor classes, but I haven't found much elaboration on implementation.

This is how I'm currently doing it:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

In the case of trying to concurrently run two looping Tasks, I've noticed that unless the Task has an internal await expression, it will get stuck in the while loop, effectively blocking other tasks from running (much like a normal while loop). However, as soon the Tasks have to (a)wait, they seem to run concurrently without an issue.

Thus, the await statements seem to provide the event loop with a foothold for switching back and forth between the tasks, giving the effect of concurrency.

Example output with internal await:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

Example output without internal await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

Questions

Does this implementation pass for a 'proper' example of concurrent looping Tasks in asyncio?

Is it correct that the only way this works is for a Task to provide a blocking point (await expression) in order for the event loop to juggle multiple tasks?

songololo
  • 4,270
  • 3
  • 32
  • 49

2 Answers2

90

Yes, any coroutine that's running inside your event loop will block other coroutines and tasks from running, unless it

  1. Calls another coroutine using yield from or await (if using Python 3.5+).
  2. Returns.

This is because asyncio is single-threaded; the only way for the event loop to run is for no other coroutine to be actively executing. Using yield from/await suspends the coroutine temporarily, giving the event loop a chance to work.

Your example code is fine, but in many cases, you probably wouldn't want long-running code that isn't doing asynchronous I/O running inside the event loop to begin with. In those cases, it often makes more sense to use asyncio.loop.run_in_executor to run the code in a background thread or process. ProcessPoolExecutor would be the better choice if your task is CPU-bound, ThreadPoolExecutor would be used if you need to do some I/O that isn't asyncio-friendly.

Your two loops, for example, are completely CPU-bound and don't share any state, so the best performance would come from using ProcessPoolExecutor to run each loop in parallel across CPUs:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = loop.run_in_executor(executor, say_boo)
    baa = loop.run_in_executor(executor, say_baa)

    loop.run_forever()
dano
  • 85,685
  • 13
  • 213
  • 211
  • Thanks. Great timing as I was just wondering about this very topic re: using executors. – songololo Mar 26 '15 at 14:13
  • Trying the above code and am finding that the boo Task is blocking baa from running unless I add the yield from asyncio.sleep(0) into each of the while True loops? – songololo Mar 26 '15 at 14:56
  • Also re-worked the run_in_executor bits as follows: loop.run_in_executor(executor, asyncio.Task(say_boo())) – songololo Mar 26 '15 at 15:05
  • 2
    @shongololo Sorry, fixed. `asyncio.async` should be used, instead of the `asyncio.Task` constructor directly. We don't want `say_boo` and `say_baa` to be coroutines, they should just be ordinary functions that run outside of the event loop, so you shouldn't add `yield from` calls to them or wrap them in an `asyncio.Task`. – dano Mar 26 '15 at 15:15
  • How can I add a callback when the task being run in the executor finishes? – luisgepeto Nov 07 '15 at 18:29
  • 1
    Looks like [asyncio.async](https://docs.python.org/3/library/asyncio-task.html#asyncio.async) is an alias to [ensure_future](https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future) and is now deprecated – srobinson Dec 18 '15 at 17:27
  • @ScottRobinson I've updated my answer to use `ensure_future` now that `async` is deprecated. – dano Dec 18 '15 at 19:16
  • I've been wondering how to code up a single-core network daemon that's very async with several cores worth of cpu-burning tasks, and this answer is a roadmap that makes it easy. The Python docs don't explain this stuff very well at all. Thanks! – Greg Lindahl Sep 16 '16 at 05:07
  • How would I run two different coroutines using multiple threads? I can find examples like this one, which illustrate how to run a regular function in multiple threads, but I can't find an example the other way around. – MikeyE Mar 23 '20 at 04:40
  • @MikeyE That's because normally there's no reason to use multiple thread to run coroutines - they're designed to run concurrently in a single thread. To do what you're asking for, you'd need to start two separate asyncio event loops, each in their own thread, and run a coroutine on each. But you should make sure you actually need to do that, since it would be an unusual requirement. – dano Mar 23 '20 at 14:04
  • 1
    I copied the code and ran it using Python 3.9. It throws an error - RuntimeError: no running event loop; but continues execution. Furthermore, as mentioned above, boo is blocking further code execution. I think I get the idea - create tasks, which will be spawned in current loop, but executed on 'external' executor - voila, working in background. But it doesn't work. First task is being ran immediately and blocks the rest since it is on the same loop. I deleted asyncio.create_task() and it seems to work correctly - both functions start to run on the loop.run_forever() line. – bez imienny Apr 27 '21 at 11:34
  • @bezimienny thanks for pointing that out. When I initially wrote this answer, `asyncio.async(` worked fine, and it also still worked when I that was deprecated and I switched it to `asyncio.ensure_future`. However, you are correct that it does not work when using `asyncio.create_task`. The asyncio docs describe `create_task` as a replacement for `ensure_future`, but the semantics aren't exactly the same, as it turns out. As you suggested, I just removed the `create_task` call altogether. – dano Apr 27 '21 at 13:22
14

You don't necessarily need a yield from x to give control over to the event loop.

In your example, I think the proper way would be to do a yield None or equivalently a simple yield, rather than a yield from asyncio.sleep(0.001):

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

Coroutines are just plain old Python generators. Internally, the asyncio event loop keeps a record of these generators and calls gen.send() on each of them one by one in a never ending loop. Whenever you yield, the call to gen.send() completes and the loop can move on. (I'm simplifying it; take a look around https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 for the actual code)

That said, I would still go the run_in_executor route if you need to do CPU intensive computation without sharing data.

Jashandeep Sohi
  • 4,503
  • 1
  • 20
  • 25
  • Works in Python 3.4 but doesn't seem to work in Python 3.5. Is there a similar approach for 3.5? (`None` seems to be more elegant than using `asyncio.sleep()` everywhere...) – songololo Jan 28 '16 at 07:10
  • 24
    Since Python 3.5, the *right* way to do this is with a `asyncio.sleep(0)`. [See this discussion.](https://github.com/python/asyncio/issues/284) – Jashandeep Sohi Jan 28 '16 at 07:56