A Python asyncio cancellation pattern

My use case was to handle the graceful termination of a RabbitMQ consumer written in Python 3.7 using asycio (with aio_pika). I didn’t want the task to be simply interrupted, since if it was in the middle of handling a new message from the queue I wouldn’t know if the item had been successfully processed.

The pattern I’m using is to wait on both a the task I’m interested in and an asyncio.Event object. This looks as follows:

import asyncio

cancellation_event = asyncio.Event()
done, pending = await asyncio.wait(
[cancellation_event.wait(), asyncio.sleep(30)],
return_when=asyncio.FIRST_COMPLETED
)

If the cancellation event is set before the sleep returns the cancellation task will be in the done set and the sleep will be in the pending set.

A real world example of this requires a little scaffolding. The example below registers a signal handler which sets the cancellation event when the program is interrupted.

import asyncio
from asyncio import Event, AbstractEventLoop
from datetime import datetime
import signal
from typing import Optional


def make_cancellation_event(
loop: Optional[AbstractEventLoop] = None
) -> Event:
# Create an event that gets set when the program is interrupted.
cancellation_event = Event()

if loop is None:
loop = asyncio.get_event_loop()

def cancel(name, num):
msg = f'Received signal {name}'
if
num == signal.SIGINT:
print(msg)
else:
print(msg)
cancellation_event.set()

for signame in ['SIGINT', 'SIGTERM']:
signum = getattr(signal, signame)
loop.add_signal_handler(signum, cancel, signame, signum)

return cancellation_event


async def async_time(delay: float) -> datetime:
await asyncio.sleep(delay)
return datetime.now()


async def main_async():
cancellation_event = make_cancellation_event()
done, pending = await asyncio.wait(
[cancellation_event.wait(), async_time(5)],
return_when=asyncio.FIRST_COMPLETED
)
if cancellation_event.is_set():
print('Cancelled')
else:
now: datetime = next(iter(done)).result()
print(f'Done: {now.isoformat()}')


asyncio.run(main_async())

The program will run until interrupted. Upon interruption it will wait for the pending tasks to finish, then exit. The output looks something like this:

now: 2019-07-31T11:41:18.118833
now: 2019-07-31T11:41:19.121330
now: 2019-07-31T11:41:20.123760
Received signal SIGINT
Done

This works well, but my actual use case (a RabbitMQ consumer) doesn’t give me an awaitable task, but an async iterator. We can mock this with the following function.

async def async_time_ticker(
delay: float
) -> AsyncIterator[datetime]:
while True:
await asyncio.sleep(delay)
yield datetime.now()

We typically use this in the following fashion:

ticker = async_time_ticker(1)
async for now in ticker:
print(now)

We can adapt the previous solution to handle this; but it looks pretty hideous.

import asyncio
from asyncio import Event, AbstractEventLoop
from datetime import datetime
import signal
from typing import Optional, AsyncIterator


def make_cancellation_event(loop: Optional[AbstractEventLoop] = None) -> Event:
# Create an event that gets set when the program is interrupted.
cancellation_event = Event()

if loop is None:
loop = asyncio.get_event_loop()

def cancel(name, num):
msg = f'Received signal {name}'
if
num == signal.SIGINT:
print(msg)
else:
print(msg)
cancellation_event.set()

for signame in ['SIGINT', 'SIGTERM']:
signum = getattr(signal, signame)
loop.add_signal_handler(signum, cancel, signame, signum)

return cancellation_event


async def async_time_ticker(
delay: float
) -> AsyncIterator[datetime]:
while True:
await asyncio.sleep(delay)
yield datetime.now()


async def main_async():
cancellation_event = make_cancellation_event()
cancellation_task = asyncio.create_task(
cancellation_event.wait()
)
ticker = async_time_ticker(1)
ticker_iter = ticker.__aiter__()
while not cancellation_event.is_set():
done, pending = await asyncio.wait(
[
cancellation_task,
ticker_iter.__anext__()
],
return_when=asyncio.FIRST_COMPLETED
)
for done_task in done:
if done_task == cancellation_task:
for pending_task in pending:
pending_task.cancel()
break
else
:
now: datetime = done_task.result()
print(f'now: {now.isoformat()}')


asyncio.run(main_async())

The __aiter__ and __anext__ are both a little scary, and the code size is getting way too large. However the mechanism is general, so we can factor out the iterator:

async def cancellable_aiter(async_iterator: AsyncIterator, cancellation_event: Event) -> AsyncIterator:
cancellation_task = asyncio.create_task(cancellation_event.wait())
result_iter = async_iterator.__aiter__()
while not cancellation_event.is_set():
done, pending = await asyncio.wait(
[cancellation_task, result_iter.__anext__()],
return_when=asyncio.FIRST_COMPLETED
)
for done_task in done:
if done_task == cancellation_task:
for pending_task in pending:
await pending_task
break
else
:
yield done_task.result()

Now our code looks fantastic:

async def main_async():
cancellation_event = make_cancellation_event()
ticker = async_time_ticker(1)
async for now in cancellable_aiter(ticker, cancellation_event):
print(f'now: {now.isoformat()}')
print("Done")

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store