ASGI Event Loop Gotcha
I’ve shed many tears with the following exception:
got Future <Future pending> attached to a different loop
Here’s a simple example which sets and clears an asyncio.Event()
, via a web page served by the Uvicorn ASGI server, using the bareASGI framework.
import asyncio
import urllib.parsefrom bareasgi import (
Application,
text_reader,
text_writer
)
import uvicornasync def get_index(scope, info, matches, content):
"""Change an event""" # !!! Do something asynchronous !!!
try:
await asyncio.wait_for(info['event'].wait(), timeout=0.1)
except asyncio.TimeoutError:
passtext = """
<html>
<body>
<button onclick="changeEvent('{state}')">{message}</button>
<script>
function changeEvent(state) {{
fetch('/change_event', {{ method: 'POST', body: state }})
.then(response => location.reload())
}}
</script>
</body>
</html>
""".format(
message='Clear' if info['event'].is_set() else 'Set',
state='False' if info['event'].is_set() else 'True'
)
return 200, [(b'content-type', b'text/html')], text_writer(text)async def change_event(scope, info, matches, content):
if await text_reader(content) == 'True':
info['event'].set()
else:
info['event'].clear()
return 204# !!! Create the event !!!
event = asyncio.Event()
app = Application(info={'event': event})
app.http_router.add({'GET'}, '/', get_index)
app.http_router.add({'POST'}, '/change_event', change_event)uvicorn.run(app, port=9009, loop='auto')
The exception is raised at the line after the comment # !!! Do something asynchronous !!!
and is cause by the seemingly innocuous line below the comment # !!! Create the event !!!
.
As is often the case the error tells you everything you need to know. The asyncio.Event()
has been created on a different loop to the one being used by the asyncio.wait()
call.
When the ASGI server starts it creates an event loop. This might be a standard asyncio
loop, a uvloop
, or a variety of other possibilities. Even if the server is using the default asyncio
loop it has likely created a new one. We could attempt to force the server to use the same event loop, but then we would be tied to a particular implementation, and loose any advantages that might be gained through an optimised event loop available on a particular operating system, or for a particular environment.
Fortunately the ASGI servers support lifespan methods. These are called at the start and end of the life of the server, and are run in the context of the ASGI server’s event loop. The following code uses a startup handler to create the event and stores it in the shared info
object.
async def create_event(scope, info, request):
"""Set up the event with the server's event loop"""
info['event'] = asyncio.Event()app = Application(info={}, startup_handlers=[create_event])
We can use the event in the request handler through the shared info
object.
async def get_index(scope, info, matches, content):
"""Change an event""" # Do something asynchronous
try:
await asyncio.wait_for(info['event'].wait(), timeout=0.1)
except asyncio.TimeoutError:
pass
Since the event was created in the context of the ASGI server’s event loop all is well.
We can use a similar technique for long running services. If we have the following mock service.
class SomeService: def __init__(self):
self.cancellation_event = asyncio.Event() def shutdown(self):
self.cancellation_event.set() async def startup(self):
print('Service started')
cancellation_task = asyncio.create_task(
self.cancellation_event.wait()
)
while not self.cancellation_event.is_set():
done, pending = await asyncio.wait(
[cancellation_task, asyncio.sleep(1)],
return_when=asyncio.FIRST_COMPLETED
)
if cancellation_task in done:
for task in pending:
task.cancel()
else:
print('Service did something')
print('Service stopped')
We can user the following startup and shutdown functions:
async def start_service(
scope: Scope,
info: Info,
request: Message
) -> None:
"""Set up the event with the server's event loop"""
service = SomeService()
service_task = asyncio.create_task(service.startup())
info['service'] = service
info['service_task'] = service_taskasync def stop_service(
scope: Scope,
info: Info,
request: Message
) -> None:
"""Set up the event with the server's event loop"""
print('Stopping service')
service = info['service']
service.shutdown()
service_task = info['service_task']
await service_task
And register them with the app:
app = Application(
info={},
startup_handlers=[start_service],
shutdown_handlers=[stop_service]
)
Now we have an asyncio service running in the background that gets gracefully shutdown with the server.