Get Pushy with Server-Sent Events

November 29, 2018

Server-sent events graphic

This post follows up a previous blog about MongoDB Change Streams, but the first section about SSE can stand alone.

It's a common problem: something changed on the server, and you want to proactively notify a user about what happened. In the past, you might have done this by polling. Every few seconds or minutes, a new request goes from the client back to the server asking for what changed. Unfortunately, this means that updates don't happen in realtime and that your server is potentially handling a steady stream of useless requests. Though conceptually simple, this method feels especially antiquated in a world where we receive constant push notifications to our devices. Servers have a couple of different methods to push updates to browsers without the client having to ask. Websockets allow for two-way communication, and then there's our topic today...

Introducing Server-Sent Events

Server-sent events (SSE) enable one-way pushes from the server to the client. They're conceptually very simple compared to websockets and can be implemented easily without adding a dependency. On the server-side, you send an initial response with the Content-Type header as text/event-stream. After that, messages are sent in this format data: YOUR MESSAGE HERE\n\n, where the two newline characters represent the end of a single message. On the frontend, you use the browser EventSource API to receive and parse the incoming events.

SSE With Express

Let's start by creating a barebones SSE example. If at any point you want a reference, here's the full source code for this section.

First, we'll need a project directory. Go ahead and make one and then run an npm init . After that, we'll need to install express, so npm install express. Finally, make a server.js file and add the following code:

const path = require('path')
const express = require('express')
const app = express()

app.get('/', (req, res) => {
  res.sendFile(path.join(__dirname, './index.html'))
})

app.get('/stream', (req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',

    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Headers': 'Origin, X-Requested-With, Content-Type, Accept',
  })

  setTimeout(() => {
    res.write('data: Sending 1\n\n')
  }, 1000)

  setTimeout(() => {
    res.write('data: Sending 2\n\n')
  }, 2000)

  setTimeout(() => {
    res.write('data: Sending 3\n\n')
  }, 3000)
})

app.listen(3333) 

We've made two routes, a GET to / and a GET to /stream. We'll talk about /stream first. When this route receives a request, it sets headers that let the client know that it is a text/event-stream, to not cache the call, and to keep the conneciton alive (there are also CORS headers, but they are not directly related to SSE). It then writes three messages at 1, 2, and 3 second respectively.

Now we need a client to request and receive these messages. In the root route, we're sending an index.html file, so let's create that now with the following code.

<!DOCTYPE html>
<html>
  <head>
    <meta charset="UTF-8">
    <title>title</title>
  </head>
  <body>
    <ul></ul>
    <script>
      const eventSource = new EventSource('http://localhost:3333/stream')
      const eventList = document.querySelector('ul')

      eventSource.onmessage = (e) => {
        const newElement = document.createElement('li')
        newElement.textContent = `message: ${e.data}`
        eventList.appendChild(newElement)
      }
    </script>
  </body>
</html>

This file contains html boilerplate with one unordered list. Inside of the script tag, we use the previously mentioned EventSource API to connect to our /stream route and register a function that will run and append received messages as an <li> to our <ul>.

With that set up, if you run node server.js and navigate to localhost:3333 in your browser, you should see three messages appear in order!

Tying it all together

Now it's time to tie our /stream endpoint together with an event source. For this example, we're going to connect it to the MongoDB change stream we made in the previous post. But you could use SSE in conjunction with any service that generates events!

First thing's first, let's bring in our code from the previous post. (Things are going to get a little hectic, so as in the previous section, here is a complete example to reference!) You should be able to make a change to a collection and see the event logged out by the server.

After that, we'll move the code from index.js where we were connecting to the database into our server.js file. (If moving the code around gets confusing, you can check out a full repo of where we're headed!) We're also going to make some modifications to the stream endpoint. After we're done, server.js should look like this:

const path = require('path')
const express = require('express')
const mongo = require('./database')
const EventBroker = require('./EventBroker')

const app = express()
const broker = new EventBroker()
const connectionString = 'mongodb://localhost:30001'

const generateSendSseCallback = res => update => {
  res.write(`data: ${JSON.stringify(update)}\n\n`)
}

app.get('/', (req, res) => {
  res.sendFile(path.join(__dirname, './index.html'))
})

app.get('/stream', (req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',

    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Headers': 'Origin, X-Requested-With, Content-Type, Accept',
  })

  try {
      const sendSse = generateSendSseCallback(res)
      broker.emitter.on('creatureChange', sendSse)
      req.on('close', () => {
        broker.emitter.removeListener('creature', sendSse)
      })
    } catch (err) {
      res.status(500)
      console.log(`[SERVER] an error occured on /stream: ${err}`)
    }
})

mongo.connect(connectionString)
    .then((conn) => {
        console.log('[SERVER] - Received back a conn!')
        const db = conn.db('babblingBrook')
        broker.init(db, 'creatures')
        app.listen(3333) 
    })
    .catch((err) => {
        console.log(`[SERVER] - Error connecting to db: ${err}`)
    })

The headers look familiar, but the action really starts inside the try/catch block. We're using a new function called generateSendSseCallback, defined at the top of the file, which we use to create a closure over the client's request. We then pass the function that generateSendSseCallback returns with our preserved request into an event broker, which you should add now in an EventBroker.js file:

const { EventEmitter } = require('events')
const mongo = require('./database')

class EventBroker {
  constructor() {
    this.emitter = new EventEmitter()
    this.emitWrapper = this.emitWrapper.bind(this)
  }

  emitWrapper(event, ...args) { this.emitter.emit(event, ...args) }

  init(db, collection) {
    mongo.watch(db, collection, this.emitWrapper)
    console.log('[EVENT BROKER] - Started Mongo change stream and connected to EventEmitter')
  }
}

module.exports = EventBroker

Hi, future Ryan here writing a couple of years after the original post. Fair warning that using an EventEmitter here isn't a very good idea for anything other than this toy example. Any real traffic immediately blows through the default max listener value and starts creating warnings of a potential memory leak. A better implementation might be to create an array of the currently active SSE connections and iterate through it to send updates whenever the document changes. Hat tip to abhishek-butola for opening an issue and chatting about it.

There's a lot going on here, and we're going to be flipping back and forth between this file and server.js to explain what's happening. The gist is that we're taking our returned anonymous function that closed over the user's request object and are passing it to an event emitter. Learn more about events in Node, here. We're telling that event emitter to invoke the function any time that it picks up a change from the MongoDB change stream.

Make sure to add the line for broker.init inside the invocation of mongo.connect in the server file. At that line, we're telling the EventBroker to notify us whenever something happens in the creatures collection.

There's one other small fix to make to get up and running. We need to go into our database.js file and pass an event type as part of our invocation of our passed in change function:

changeStream.on('change', (change) => {
    onChangeFunction('creatureChange', change.fullDocument)
})

Because our EventBroker can emit events of multiple types, saying creatureChange here corresponds to the creatureChange we have in the /stream handler in the server file.

With everything set up, it's game time. Make sure the Mongo replica set is running, and then start the server fresh. A few logs should show up:

[MONGO - CONNECT] - Successfully connected to database server
[SERVER] - Received back a conn!
[EVENT BROKER] - Started Mongo change stream and connected to EventEmitter

Make sure the browser is open to http://localhost:3333, then using the Mongo cli, insert to the creatures collection just like in the previous tutorial. Hit return.... click... and boom! Something akin to message: {"_id":"5bfef740419aeed958bbfa1b","type":"crawdad"} should have shown up on the screen. Now fire off more events to your ❤️'s content!

Wrapping up

Now that you know more about working with server-sent events, maybe it's time to hook up another event source?