Python Countdown Event
While writing some server side code I needed to block until all tasks for a particular component had completed; like a semaphore in reverse. After some googling failed to provide a ready made solution I rolled my own.
from asyncio import Eventclass CountdownEvent: def __init__(self)-> None:
self._event = Event()
self._count = 0
self._event.set() def increment(self) -> int:
self._count += 1
self._event.clear()
return self._count def decrement(self) -> int:
assert self._count > 0, "Count cannot go below zero"
self._count -= 1
if self._count == 0:
self._event.set()
return self._count async def wait(self) -> None:
await self._event.wait() @property
def count(self) -> int:
return self._count
The event is initially set (so it won’t block), but an increment
will clear the event (making it block) until subsequent decrement
‘s reduce the count to zero, at which point it will be set (so it won’t block).
We can try this by mocking some long running tasks which use the countdown event. Then we stop the tasks after a few seconds, and wait on the countdown event for all tasks to finish.
import asyncio
from countdown_event import CountdownEventasync def long_running_task(countdown_event,cancellation_event):
count = countdown_event.increment()
print(f'incremented count to {count}')
try:
print('Waiting for cancellation event')
await cancellation_event.wait()
finally:
count = countdown_event.decrement()
print(f'decremented count to {count}')async def stop_tasks(secs, countdown_event, cancellation_event):
print(f'waiting {secs} seconds before setting the cancellation event')
await asyncio.sleep(secs)
print('setting the cancellation event')
cancellation_event.set()
print('waiting for tasks to finish')
await countdown_event.wait()
print('countdown event cleared')async def main_async(): cancellation_event = asyncio.Event()
countdown_event = CountdownEvent() tasks = [
long_running_task(countdown_event, cancellation_event),
long_running_task(countdown_event, cancellation_event),
long_running_task(countdown_event, cancellation_event),
stop_tasks(5, countdown_event, cancellation_event)
] await asyncio.wait(tasks)
assert countdown_event.count == 0
print("done")if __name__ == "__main__":
asyncio.run(main_async())
Running this gives the following output.
incremented count to 1
Waiting for cancellation event
incremented count to 2
Waiting for cancellation event
waiting 5 seconds before setting the cancellation event
incremented count to 3
Waiting for cancellation event
setting the cancellation event
waiting for tasks to finish
decremented count to 2
decremented count to 1
decremented count to 0
countdown event cleared
done
We now have fine grained control over the shutdown behaviour.
I have posted the source code is on GitHub - https://github.com/rob-blackbourn/countdown-event and uploaded the package to pypi.