Beyond EventSource: Streaming fetch with ReadableStream
I’ve been using EventSource
for some time now for streaming GraphQL subscriptions, and it’s fantastic, but there are two reasons why it sucks.
- It uses an arcane text based protocol,
- The only way to communicate the initial request is with the url.
The second has started to cause me problems. When my GraphQL subscription queries become to large I get unexpected errors which are hard to debug.
Under the hood we know that the EventSource is simply a streaming fetch using the GET method. If only there were a way to do a streaming fetch as a POST we could get all the benefits of the EventSource with control over the protocol, and the ability to send the query in the body of the request. Fortunately there is!
Readable Streams
Along with fetch(<url>).then(response => response.text())
and fetch(<url>).then(response => response.json())
there is fetch(<url>).then(response => response.body)
.
The body
property exposes a ReadableStream
which can be used to process the fetch response as the data is received. We can use this to generate events that have the same functionality as EventSource
. Here’s the function which we’ll be creating.
function FetchEventTarget(input, init) {
const eventTarget = new EventTarget()
const jsonDecoder = makeJsonDecoder(input)
const eventStream = makeWriteableEventStream(eventTarget)
fetch(input, init)
.then(response => {
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(jsonDecoder)
.pipeTo(eventStream)
})
.catch(error => {
eventTarget.dispatchEvent(
new CustomEvent('error', { detail: error }))
})
return eventTarget
}
At the heart of this is a fetch
call from which we get the body
. Next we use the pipeThrough
method with a TextDecoderStream
to turn the incoming bytes into text. Then we need to parse the text as JSON and finally generate events. We need to do some actual work.
The following function assumes the incoming JSON messages are line delimited. It splits the incoming text into lines, then parses the line as JSON.
function makeJsonDecoder() {
return new TransformStream({
start(controller) {
controller.buf = ''
controller.pos = 0
},
transform(chunk, controller) {
controller.buf += chunk
while (controller.pos < controller.buf.length) {
if (controller.buf[controller.pos] == '\n') {
const line = controller.buf.substring(0, controller.pos)
controller.enqueue(JSON.parse(line))
controller.buf = controller.buf.substring(controller.pos + 1)
controller.pos = 0
} else {
++controller.pos
}
}
}
})
}
I won’t go through the algorithm for splitting the text into lines. The key things to note are the use of the TransformStream
class with the object provided which has a start
and transform
method. We use the start
method to set the initial state, then process the text into lines, finally turning them into JSON and calling the controller.enqueue
method to pass the JSON to the next pipe in the pipline.
The next step is to turn the JSON stream into events.
function makeWriteableEventStream(eventTarget) {
return new WritableStream({
start(controller) {
eventTarget.dispatchEvent(new Event('start'))
},
write(message, controller) {
eventTarget.dispatchEvent(
new MessageEvent(
message.type,
{ data: message.data }
)
)
},
close(controller) {
eventTarget.dispatchEvent(new CloseEvent('close'))
},
abort(reason) {
eventTarget.dispatchEvent(new CloseEvent('abort', { reason }))
}
})
}
We use the WriteableStream
class providing an object implementing the methods to dispatch events for it’s lifetime methods: open, close, and the data messages.
Now we have implemented all the parts required by our initial function. We can call it in the following way:
const eventTarget = new FetchEventTarget(
'http://example.com/events', {
method: 'POST',
headers: new Headers({
'accept': 'application/json',
'content-type': 'application/json'
}),
mode: 'same-origin',
signal: abortController.signal,
body: JSON.stringify({ query: 'Some query' })
})eventTarget.addEventListener('an-event-name', event => {
console.log(event.data)
})
Now we can see the power of the fully operational streaming fetch. We can specify the method, headers, control CORS, and pass a body. With the event target support we have an interaction which feels very similar to the EventSource
approach.
If you would like to take this for a spin there is a Python demo.
$ mkdir demo
$ cd demo
demo $ python3.7 -m venv .venv
demo $ source .venv/bin/activate
(.venv) demo $ pip install bareasgi
Successfully installed bareasgi-3.2.0 baretypes-3.0.5 bareutils-3.1.0
(.venv) demo $ pip install uvicorn
Successfully installed click-7.0 h11-0.8.1 httptools-0.0.13 uvicorn-0.9.0 uvloop-0.13.0 websockets-8.0.2
(.venv) demo $ wget https://raw.githubusercontent.com/rob-blackbourn/bareasgi/master/examples/streaming_fetch.html
(.venv) demo $ wget https://raw.githubusercontent.com/rob-blackbourn/bareasgi/master/examples/streaming_fetch.py
(.venv) demo $ python streaming_fetch.py
INFO:uvicorn:Uvicorn running on http://127.0.0.1:9009 (Press CTRL+C to quit)
Navigate to http://127.0.0.1:9009
and give it a try.
Standards
The good thing about standards is there are so many to choose from. At the time of writing most modern browsers support readable streams, but only Chrome supports the pipe methods.