ASGI Event Loop Gotcha

Rob Blackbourn
3 min readAug 9, 2019

--

I’ve shed many tears with the following exception:

got Future <Future pending> attached to a different loop

Here’s a simple example which sets and clears an asyncio.Event(), via a web page served by the Uvicorn ASGI server, using the bareASGI framework.

import asyncio
import urllib.parse
from bareasgi import (
Application,
text_reader,
text_writer
)
import uvicorn
async def get_index(scope, info, matches, content):
"""Change an event"""
# !!! Do something asynchronous !!!
try:
await asyncio.wait_for(info['event'].wait(), timeout=0.1)
except asyncio.TimeoutError:
pass
text = """
<html>
<body>
<button onclick="changeEvent('{state}')">{message}</button>
<script>
function changeEvent(state) {{
fetch('/change_event', {{ method: 'POST', body: state }})
.then(response => location.reload())
}}
</script>
</body>
</html>
""".format(
message='Clear' if info['event'].is_set() else 'Set',
state='False' if info['event'].is_set() else 'True'
)
return 200, [(b'content-type', b'text/html')], text_writer(text)
async def change_event(scope, info, matches, content):
if await text_reader(content) == 'True':
info['event'].set()
else:
info['event'].clear()
return 204
# !!! Create the event !!!
event = asyncio.Event()
app = Application(info={'event': event})
app.http_router.add({'GET'}, '/', get_index)
app.http_router.add({'POST'}, '/change_event', change_event)
uvicorn.run(app, port=9009, loop='auto')

The exception is raised at the line after the comment # !!! Do something asynchronous !!! and is cause by the seemingly innocuous line below the comment # !!! Create the event !!!.

As is often the case the error tells you everything you need to know. The asyncio.Event() has been created on a different loop to the one being used by the asyncio.wait() call.

When the ASGI server starts it creates an event loop. This might be a standard asyncioloop, a uvloop, or a variety of other possibilities. Even if the server is using the default asyncio loop it has likely created a new one. We could attempt to force the server to use the same event loop, but then we would be tied to a particular implementation, and loose any advantages that might be gained through an optimised event loop available on a particular operating system, or for a particular environment.

Fortunately the ASGI servers support lifespan methods. These are called at the start and end of the life of the server, and are run in the context of the ASGI server’s event loop. The following code uses a startup handler to create the event and stores it in the shared info object.

async def create_event(scope, info, request):
"""Set up the event with the server's event loop"""
info['event'] = asyncio.Event()
app = Application(info={}, startup_handlers=[create_event])

We can use the event in the request handler through the shared info object.

async def get_index(scope, info, matches, content):
"""Change an event"""
# Do something asynchronous
try:
await asyncio.wait_for(info['event'].wait(), timeout=0.1)
except asyncio.TimeoutError:
pass

Since the event was created in the context of the ASGI server’s event loop all is well.

We can use a similar technique for long running services. If we have the following mock service.

class SomeService:    def __init__(self):
self.cancellation_event = asyncio.Event()
def shutdown(self):
self.cancellation_event.set()
async def startup(self):
print('Service started')
cancellation_task = asyncio.create_task(
self.cancellation_event.wait()
)
while not self.cancellation_event.is_set():
done, pending = await asyncio.wait(
[cancellation_task, asyncio.sleep(1)],
return_when=asyncio.FIRST_COMPLETED
)
if cancellation_task in done:
for task in pending:
task.cancel()
else:
print('Service did something')
print('Service stopped')

We can user the following startup and shutdown functions:

async def start_service(
scope: Scope,
info: Info,
request: Message
) -> None:
"""Set up the event with the server's event loop"""
service = SomeService()
service_task = asyncio.create_task(service.startup())
info['service'] = service
info['service_task'] = service_task
async def stop_service(
scope: Scope,
info: Info,
request: Message
) -> None:
"""Set up the event with the server's event loop"""
print('Stopping service')
service = info['service']
service.shutdown()
service_task = info['service_task']
await service_task

And register them with the app:

app = Application(
info={},
startup_handlers=[start_service],
shutdown_handlers=[stop_service]
)

Now we have an asyncio service running in the background that gets gracefully shutdown with the server.

--

--