Dive into MongoDB Change Streams

October 14, 2018

And don't bonk your head 🤕

mongodb logo on a stream background

How do you take an event kicked off by one user and then notify any interested viewers about the result? For my team at work, the answer was Node, MongoDB change streams, and server-sent events. In the first post of this two-part series, I'm going to explain the problem we set out to solve, introduce change streams, build a local Mongo Replica Set with Docker Compose, and watch changes happen in realtime. Part II will cover connecting the change stream to an Express server and fanning events out to any subscribed clients. If you want to reference a complete example at any time while reading, the full source code for this tutorial is available, here. Let's get into it!

The problem

I work on a team automating application failover during infrastructure outages. When another engineer needs to failover, we want her to click one button to restart somewhere else instead of having to manually change the many components of the application (think things like databases, load balancing rules, app servers, etc.) We also want other application stakeholders to be able to view the changes occurring because of that button click as they happen.

In a world where that engineer calls to our application running on one server, that server would do the work and report back to the original client. Our other stakeholders could also call the server and subscribe to messages over websockets or server-sent events as things happened in that process.

But we deploy our applications to multiple installations of Cloud Foundry that have no knowledge of each other and no guarantees about the number of application instances running at any given time. Our original engineer could kick off the failover and have her calls routed to one application instance. Then a stakeholder who wants to see how the failover is progressing could make a call that gets routed to an entirely separate instance and be left scratching his head about why nothing is happening, unless we regularly polled the database, which was not a solution we wanted to pursue for several reasons.

architecture

Stakeholder left scratching his head

To solve this problem, we could have used a messaging queue like RabbitMQ or Kafka to fanout messages to any subscribed application instances. Redis also has pub/sub functionality. Unfortunately, we could not use these tools for various reasons.

But then, just as we were about to give up hope...

Introducing change streams

In its 3.6 release in February of 2018, MongoDB introduced Change Streams. This feature allows applications to monitor realtime changes to a collection. Change streams are built on top of the oplog (operation log), a collection that records recent writes to the database. The oplog is part of MongoDB's replication infrastructure that allows primary and secondary database servers to communicate. Because of this, change streams only work when MongoDB is running as a replica set.

Setting up a replica set

We want change streams, so we're going to need that replica set. Let's get one set up locally. Instead of going through it myself, I'll point you to this fantastic blog post about setting up a replica set with Docker. If you want the fastest and easiest path to a replica set, just keep going to the next section.

Make that easier with Docker Compose

The time to create that replica set would add up if you were doing it every day. Thankfully, Docker Compose exists to run multi-container applications with ease and comes along with Docker for Mac and Docker for Windows (Linux takes a bit more work). Here is another excellent blog post about using docker-compose to get your replica set running with one command. If you want a tl;dr, here's what we created after reading:

version: "3"
services:
  mongo1:
    image: mongo:3.6
    volumes:
    - mongo1data:/data/db
    ports:
    - "30001:27017"
    command: ["mongod", "--replSet", "jump-set"]

  mongo2:
    image: mongo:3.6
    volumes:
    - mongo2data:/data/db
    ports:
    - "30002:27017"
    command: ["mongod", "--replSet", "jump-set"]

  mongo3:
    image: mongo:3.6
    volumes:
    - mongo3data:/data/db
    ports:
    - "30003:27017"
    command: ["mongod", "--replSet", "jump-set"]

  mongosetup:
    image: mongo:3.6
    volumes:
      - ./scripts:/scripts
    entrypoint: ["bash", "/scripts/mongoSetup.sh"]

volumes:
  mongo1data:
  mongo2data:
  mongo3data:

Essentially, we spin up three MongoDB containers and expose them on a series of ports. We launch them with the mongod command and run them in a replica set that we named jump-set. That's a totally arbitrary value, so show your ✨ personality. You'll notice one other container called mongosetup that is also based on the mongo:3.6 image. We clone a scripts directory from our host filesystem to an identically named directory inside of the container. That scripts directory contains a file that we run on startup of mongosetup to create the replica set. So create a scripts directory in your project and add the following code in a mongoSetup.sh file that lives in scripts:

#!/bin/bash
echo "Waiting for startup.."
until curl http://mongo1:28017/serverStatus\?text\=1 2>&1 | grep uptime | head -1; do
  printf '.'
  sleep 1
done

echo curl http://mongo1:28017/serverStatus\?text\=1 2>&1 | grep uptime | head -1
echo "Started.."

sleep 10

echo SETUP.sh time now: `date +"%T" `
mongo --host mongo1:27017 <<EOF
   var cfg = {
        "_id": "jump-set",
        "version": 1,
        "members": [
            {
                "_id": 0,
                "host": "mongo1:27017",
                "priority": 2
            },
            {
                "_id": 1,
                "host": "mongo2:27017",
                "priority": 0
            },
            {
                "_id": 2,
                "host": "mongo3:27017",
                "priority": 0
            }
        ]
    };
    rs.initiate(cfg, { force: true });
    rs.reconfig(cfg, { force: true });
    db.getMongo().setReadPref('nearest');
EOF

tail -f /dev/null

The script connects to each of the three mongo containers that were started moments earlier by docker-compose. It then kicks off the replicaset. And now you have a running replica set again, so it's time for...

Connecting to the change stream

First thing's first, let's connect to the database from Node. In your project directory, set up a new project with npm init and then npm install mongodb, which contains the officially supported drivers. We'll then create a new file database.js and add the following code:

const { MongoClient } = require('mongodb')

async function connect(connectionString) {
  try {
    const conn = await MongoClient.connect(connectionString, { useNewUrlParser: true })
    console.log('[MONGO - CONNECT] - Successfully connected to database server')
    return conn
  } catch(err) {
    console.log(`[MONGO - CONNECT] - Failed to connect to MongoDB: ${err}`)
    throw err
  }
}

module.exports = { connect}

First we require the mongodb module we just intalled and destructure the MongoClient. Then we define an async function that takes a connection string (more on that in a second) and use the connection string as an argument for the MongoClient's connect method. We also catch any errors and export the function to call it somewhere else.

That "somewhere else" is a new index.js. Make that and add the following code:

const mongo = require('./database')

const connectionString = 'mongodb://localhost:30001'
mongo.connect(connectionString)
    .then((conn) => {
        console.log('[INDEX] - Received back a conn!')
    })
    .catch((err) => {
        console.log(`[INDEX] - Error connecting to db: ${err}`)
    })

There's that connection string we were talking about a second ago. In this one, mongodb is the protocol, localhost:30001 is our host. Now when we run node index.js, after a few seconds you should see:

[MONGO - CONNECT] - Successfully connected to database server
[INDEX] - Received back a conn!

Exciting, but not too useful. Now let's do something with that connection and monitor a collection's change stream! Add the following code to our previously created database.js. (Don't forget to export our newly created watch function!)

function watch(db, coll, onChangeFunction) {
    const collection = db.collection(coll)
    const changeStream = collection.watch()

    changeStream.on('change', (change) => {
        onChangeFunction(change.fullDocument)
    })
}

Now back in index.js we'll call watch.

const mongo = require('./database')

const connectionString = 'mongodb://localhost:30001'
mongo.connect(connectionString)
    .then((conn) => {
        console.log('[INDEX] - Received back a conn!')
        const db = conn.db('babblingBrook')
        mongo.watch(db, 'creatures', (change) => {
            console.log(`[INDEX] - Saw a change: ${JSON.stringify(change)}`)
        })
    })
    .catch((err) => {
        console.log(`[INDEX] - Error connecting to db: ${err}`)
    })

If you try to run index.js now, you'll see an error along the lines of:

MongoError: cannot open $changeStream for non-existent database: babblingBrook

While we were able to connect to the database server, we need to actually create a collection inside of our database (mine is called babblingBrook but you can name your own). To create a collection to monitor, we'll use the MongoDB CLI. Go ahead and brew install mongodb if you don't have it and are on a Mac. Linux and Windows installation instructions are here. With that done run the following command to connect to the babblingBrook database:

$ mongo mongodb://localhost:30001/babblingBrook

And then when prompted by the REPL:

jump-set:PRIMARY> db.creatures.insertOne({type: 'salmon'})

With our collection primed and ready to go, let's rerun index.js. In another tab, get back into the mongo shell and run one more insert:

jump-set:PRIMARY> db.creatures.insertOne({type: 'crawdad'})

Did you see it? Back in the node process tab, you should have received and logged data from the change stream!

[INDEX] - Saw a change: {"_id":"5bb76f8904b8377e363b6103","type":"crawdad"}

Wrapping up

So that's a quick introduction to MongoDB change streams. In the next post, we'll go over connecting this change stream to Server-sent events that can push this data out to many clients at the same time!

Any questions? You can reach me on Twitter @rtravitz.