Beyond EventSource: Streaming fetch with ReadableStream

I’ve been using EventSource for some time now for streaming GraphQL subscriptions, and it’s fantastic, but there are two reasons why it sucks.

  1. It uses an arcane text based protocol,
  2. The only way to communicate the initial request is with the url.

The second has started to cause me problems. When my GraphQL subscription queries become to large I get unexpected errors which are hard to debug.

Under the hood we know that the EventSource is simply a streaming fetch using the GET method. If only there were a way to do a streaming fetch as a POST we could get all the benefits of the EventSource with control over the protocol, and the ability to send the query in the body of the request. Fortunately there is!

Readable Streams

The body property exposes a ReadableStream which can be used to process the fetch response as the data is received. We can use this to generate events that have the same functionality as EventSource. Here’s the function which we’ll be creating.

function FetchEventTarget(input, init) {
const eventTarget = new EventTarget()
const jsonDecoder = makeJsonDecoder(input)
const eventStream = makeWriteableEventStream(eventTarget)
fetch(input, init)
.then(response => {
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(jsonDecoder)
.pipeTo(eventStream)
})
.catch(error => {
eventTarget.dispatchEvent(
new CustomEvent('error', { detail: error }))
})
return eventTarget
}

At the heart of this is a fetch call from which we get the body. Next we use the pipeThrough method with a TextDecoderStream to turn the incoming bytes into text. Then we need to parse the text as JSON and finally generate events. We need to do some actual work.

The following function assumes the incoming JSON messages are line delimited. It splits the incoming text into lines, then parses the line as JSON.

function makeJsonDecoder() {
return new TransformStream({
start(controller) {
controller.buf = ''
controller.pos = 0
},
transform(chunk, controller) {
controller.buf += chunk
while (controller.pos < controller.buf.length) {
if (controller.buf[controller.pos] == '\n') {
const line = controller.buf.substring(0, controller.pos)
controller.enqueue(JSON.parse(line))
controller.buf = controller.buf.substring(controller.pos + 1)
controller.pos = 0
} else {
++controller.pos
}
}
}
})
}

I won’t go through the algorithm for splitting the text into lines. The key things to note are the use of the TransformStream class with the object provided which has a start and transform method. We use the start method to set the initial state, then process the text into lines, finally turning them into JSON and calling the controller.enqueue method to pass the JSON to the next pipe in the pipline.

The next step is to turn the JSON stream into events.

function makeWriteableEventStream(eventTarget) {
return new WritableStream({
start(controller) {
eventTarget.dispatchEvent(new Event('start'))
},
write(message, controller) {
eventTarget.dispatchEvent(
new MessageEvent(
message.type,
{ data: message.data }
)
)
},
close(controller) {
eventTarget.dispatchEvent(new CloseEvent('close'))
},
abort(reason) {
eventTarget.dispatchEvent(new CloseEvent('abort', { reason }))
}
})
}

We use the WriteableStream class providing an object implementing the methods to dispatch events for it’s lifetime methods: open, close, and the data messages.

Now we have implemented all the parts required by our initial function. We can call it in the following way:

const eventTarget = new FetchEventTarget(
'http://example.com/events', {
method: 'POST',
headers: new Headers({
'accept': 'application/json',
'content-type': 'application/json'
}),
mode: 'same-origin',
signal: abortController.signal,
body: JSON.stringify({ query: 'Some query' })
})
eventTarget.addEventListener('an-event-name', event => {
console.log(event.data)
})

Now we can see the power of the fully operational streaming fetch. We can specify the method, headers, control CORS, and pass a body. With the event target support we have an interaction which feels very similar to the EventSource approach.

If you would like to take this for a spin there is a Python demo.

$ mkdir demo
$ cd demo
demo $ python3.7 -m venv .venv
demo $ source .venv/bin/activate
(.venv) demo $ pip install bareasgi
Successfully installed bareasgi-3.2.0 baretypes-3.0.5 bareutils-3.1.0
(.venv) demo $ pip install uvicorn
Successfully installed click-7.0 h11-0.8.1 httptools-0.0.13 uvicorn-0.9.0 uvloop-0.13.0 websockets-8.0.2
(.venv) demo $ wget https://raw.githubusercontent.com/rob-blackbourn/bareasgi/master/examples/streaming_fetch.html
(.venv) demo $ wget https://raw.githubusercontent.com/rob-blackbourn/bareasgi/master/examples/streaming_fetch.py
(.venv) demo $ python streaming_fetch.py
INFO:uvicorn:Uvicorn running on http://127.0.0.1:9009 (Press CTRL+C to quit)

Navigate to http://127.0.0.1:9009 and give it a try.

Standards

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store