A Python asyncio cancellation pattern
My use case was to handle the graceful termination of a RabbitMQ consumer written in Python 3.7 using asycio (with aio_pika). I didn’t want the task to be simply interrupted, since if it was in the middle of handling a new message from the queue I wouldn’t know if the item had been successfully processed.
The pattern I’m using is to wait on both a the task I’m interested in and an asyncio.Event
object. This looks as follows:
import asyncio
cancellation_event = asyncio.Event()
done, pending = await asyncio.wait(
[cancellation_event.wait(), asyncio.sleep(30)],
return_when=asyncio.FIRST_COMPLETED
)
If the cancellation event is set before the sleep returns the cancellation task will be in the done
set and the sleep will be in the pending
set.
A real world example of this requires a little scaffolding. The example below registers a signal handler which sets the cancellation event when the program is interrupted.
import asyncio
from asyncio import Event, AbstractEventLoop
from datetime import datetime
import signal
from typing import Optional
def make_cancellation_event(
loop: Optional[AbstractEventLoop] = None
) -> Event:
# Create an event that gets set when the program is interrupted.
cancellation_event = Event()
if loop is None:
loop = asyncio.get_event_loop()
def cancel(name, num):
msg = f'Received signal {name}'
if num == signal.SIGINT:
print(msg)
else:
print(msg)
cancellation_event.set()
for signame in ['SIGINT', 'SIGTERM']:
signum = getattr(signal, signame)
loop.add_signal_handler(signum, cancel, signame, signum)
return cancellation_event
async def async_time(delay: float) -> datetime:
await asyncio.sleep(delay)
return datetime.now()
async def main_async():
cancellation_event = make_cancellation_event()
done, pending = await asyncio.wait(
[cancellation_event.wait(), async_time(5)],
return_when=asyncio.FIRST_COMPLETED
)
if cancellation_event.is_set():
print('Cancelled')
else:
now: datetime = next(iter(done)).result()
print(f'Done: {now.isoformat()}')
asyncio.run(main_async())
The program will run until interrupted. Upon interruption it will wait for the pending tasks to finish, then exit. The output looks something like this:
now: 2019-07-31T11:41:18.118833
now: 2019-07-31T11:41:19.121330
now: 2019-07-31T11:41:20.123760
Received signal SIGINT
Done
This works well, but my actual use case (a RabbitMQ consumer) doesn’t give me an awaitable task, but an async iterator. We can mock this with the following function.
async def async_time_ticker(
delay: float
) -> AsyncIterator[datetime]:
while True:
await asyncio.sleep(delay)
yield datetime.now()
We typically use this in the following fashion:
ticker = async_time_ticker(1)
async for now in ticker:
print(now)
We can adapt the previous solution to handle this; but it looks pretty hideous.
import asyncio
from asyncio import Event, AbstractEventLoop
from datetime import datetime
import signal
from typing import Optional, AsyncIterator
def make_cancellation_event(loop: Optional[AbstractEventLoop] = None) -> Event:
# Create an event that gets set when the program is interrupted.
cancellation_event = Event()
if loop is None:
loop = asyncio.get_event_loop()
def cancel(name, num):
msg = f'Received signal {name}'
if num == signal.SIGINT:
print(msg)
else:
print(msg)
cancellation_event.set()
for signame in ['SIGINT', 'SIGTERM']:
signum = getattr(signal, signame)
loop.add_signal_handler(signum, cancel, signame, signum)
return cancellation_event
async def async_time_ticker(
delay: float
) -> AsyncIterator[datetime]:
while True:
await asyncio.sleep(delay)
yield datetime.now()
async def main_async():
cancellation_event = make_cancellation_event()
cancellation_task = asyncio.create_task(
cancellation_event.wait()
)
ticker = async_time_ticker(1)
ticker_iter = ticker.__aiter__()
while not cancellation_event.is_set():
done, pending = await asyncio.wait(
[
cancellation_task,
ticker_iter.__anext__()
],
return_when=asyncio.FIRST_COMPLETED
)
for done_task in done:
if done_task == cancellation_task:
for pending_task in pending:
pending_task.cancel()
break
else:
now: datetime = done_task.result()
print(f'now: {now.isoformat()}')
asyncio.run(main_async())
The __aiter__
and __anext__
are both a little scary, and the code size is getting way too large. However the mechanism is general, so we can factor out the iterator:
async def cancellable_aiter(async_iterator: AsyncIterator, cancellation_event: Event) -> AsyncIterator:
cancellation_task = asyncio.create_task(cancellation_event.wait())
result_iter = async_iterator.__aiter__()
while not cancellation_event.is_set():
done, pending = await asyncio.wait(
[cancellation_task, result_iter.__anext__()],
return_when=asyncio.FIRST_COMPLETED
)
for done_task in done:
if done_task == cancellation_task:
for pending_task in pending:
await pending_task
break
else:
yield done_task.result()
Now our code looks fantastic:
async def main_async():
cancellation_event = make_cancellation_event()
ticker = async_time_ticker(1)
async for now in cancellable_aiter(ticker, cancellation_event):
print(f'now: {now.isoformat()}')
print("Done")