While writing some server side code I needed to block until all tasks for a particular component had completed; like a semaphore in reverse. After some googling failed to provide a ready made solution I rolled my own.

from asyncio import Eventclass CountdownEvent:    def __init__(self)-> None:
self._event = Event()
self._count = 0
self._event.set()
def increment(self) -> int:
self._count += 1
self._event.clear()
return self._count
def decrement(self) -> int:
assert self._count > 0, "Count cannot go below zero"
self._count -= 1
if self._count == 0:
self._event.set()
return self._count
async def wait(self) -> None:
await self._event.wait()
@property
def count(self) -> int:
return self._count

The event is initially set (so it won’t block), but an increment will clear the event (making it block) until subsequent decrement‘s reduce the count to zero, at which point it will be set (so it won’t block).

We can try this by mocking some long running tasks which use the countdown event. Then we stop the tasks after a few seconds, and wait on the countdown event for all tasks to finish.

import asyncio
from countdown_event import CountdownEvent
async def long_running_task(countdown_event,cancellation_event):
count = countdown_event.increment()
print(f'incremented count to {count}')
try:
print('Waiting for cancellation event')
await cancellation_event.wait()
finally:
count = countdown_event.decrement()
print(f'decremented count to {count}')
async def stop_tasks(secs, countdown_event, cancellation_event):
print(f'waiting {secs} seconds before setting the cancellation event')
await asyncio.sleep(secs)
print('setting the cancellation event')
cancellation_event.set()
print('waiting for tasks to finish')
await countdown_event.wait()
print('countdown event cleared')
async def main_async(): cancellation_event = asyncio.Event()
countdown_event = CountdownEvent()
tasks = [
long_running_task(countdown_event, cancellation_event),
long_running_task(countdown_event, cancellation_event),
long_running_task(countdown_event, cancellation_event),
stop_tasks(5, countdown_event, cancellation_event)
]
await asyncio.wait(tasks)
assert countdown_event.count == 0
print("done")
if __name__ == "__main__":
asyncio.run(main_async())

Running this gives the following output.

incremented count to 1
Waiting for cancellation event
incremented count to 2
Waiting for cancellation event
waiting 5 seconds before setting the cancellation event
incremented count to 3
Waiting for cancellation event
setting the cancellation event
waiting for tasks to finish
decremented count to 2
decremented count to 1
decremented count to 0
countdown event cleared
done

We now have fine grained control over the shutdown behaviour.

I have posted the source code is on GitHub - https://github.com/rob-blackbourn/countdown-event and uploaded the package to pypi.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store