Concurrency Three Ways

January 30, 2019

three arrows starting a different places

"Concurrency is not parallelism."

I'd heard it before a few times but never really thought about it too hard until I watched a talk of that same title by Rob Pike, one of the creators of the Go programming language (also Plan 9 and UTF-8. He's really cool!) The concepts have similar dictionary definitions but different meanings in computer science. Concurrency is when multiple tasks can execute in overlapping time periods. It doesn't mean they will be running at the same instant. Parallelism is when multiple tasks run at the same time. You need multiple hardware cores for this to happen.

It got me thinking about how those concepts apply in my day-to-day work. As part of an automation team, we tie together other systems, which means we have plenty of network I/O. For performance, it's important for us to be able to run a lot of those network requests concurrently. So that's what this post is going to focus on!

I've seen a few different ways to write concurrent code:

  1. An event loop
  2. Channels
  3. Semaphores and mutexes

I'm going to demonstrate each below with some code examples where I call the Star Wars API to ask about planets (ex. Tatooine) and people (ex. Luke Skywalker). In each example, I use one of the concurrency models to ask for information about several planets and people and then combine that data. Let's jump in!

1. An event loop

To demonstrate an event loop, I'm going to use Node.js. Node runs in a single thread and uses an event loop in the background. We write our code to make a series of calls to getPlanet and getPerson that make a network request. That request returns a Promise that will complete ("resolve" in Promise parlance) at some point in the future. Instead of waiting around for it, Node takes the Promise that we yield from the function and adds it to the back of a queue. When the CPU is looking for more work to do, the event loop pops a piece of work off the front of the queue and checks if anything can happen. If not, then back on the end of the queue it goes.

In the following example, we use a call to Promise.all combined with the await keyword (more on async/await, here) to not progress past that point in the code until all of our network calls have resolved with their values. But while those are pending, Node can continue doing other work in the background!

async function main() {
  const planetCalls = []
  const personCalls = []

  for (let i = 1; i < 4; i++) {
    planetCalls.push(getPlanet(i))
    personCalls.push(getPerson(i))
  }

  const planets = await Promise.all(planetCalls)
  const people = await Promise.all(personCalls)

  planets.forEach((planet, idx) => { printIncomingTransmissions(planet, people[idx]) })
}

(Click here to see the full example code)

If you want an expert's take on the event loop, I really love this talk by Google Developer Advocate Jake Archibald that explains it in-depth.

2. Channels

I've been writing a lot of Go lately and have been making use of its channels and goroutines. A goroutine is a function that's capable of running concurrently with other functions, signified by the go keyword. Channels provide a way for goroutines to synchronize their execution.

Go's channels are based on communicating sequential processes. (I can merely provide this link. My understanding ends here.)

In the following example, API calls are sent into a channel after they complete. You can see this in action in the sendPlanets function which takes the planet value that came back from the call to GetPlanet and sends it into the planets channel that was passed in as an argument. That same channel is also passed to printIncomingTransmissions and will block at the line planet := <-planets, until a value comes out of that planets channel to be assigned.

Unlike the previous event loop example that preserved the original order, the channels send values in whatever order they are received, which means that we get varying combinations of planets and people!

func printIncomingTransmissions(planets chan Planet, people chan Person) {
	planet := <-planets
	person := <-people
	fmt.Printf("Incoming transmission from the %s surface of %s! It's from %s\n", planet.Climate, planet.Name, person.Name)
}

func sendPlanet(id int, planets chan Planet) {
	planet, err := GetPlanet(id)
	if err != nil {
		log.Fatalf("Error retrieving planet: %v\n", err)
	}
	planets <- planet
}

func sendPerson(id int, people chan Person) {
	person, err := GetPerson(id)
	if err != nil {
		log.Fatalf("Error retrieving person: %v\n", err)
	}
	people <- person
}

func main() {
	planets := make(chan Planet)
	people := make(chan Person)

	for i := 1; i < 4; i++ {
		go sendPlanet(i, planets)
		go sendPerson(i, people)
	}

	printIncomingTransmissions(planets, people)
	printIncomingTransmissions(planets, people)
	printIncomingTransmissions(planets, people)
}

(Click here to see the full example code)

3. Semaphores and mutexes

The previous two examples coordinated passed values, but what if we need to have concurrent functions mutate the same piece of memory?

Consider the example below:

func main() {
	var wg sync.WaitGroup
	planetList := make(map[string]string)

	for i := 1; i < 4; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			planet, err := GetPlanet(id)
			if err != nil {
				log.Fatalf("Err getting planet: %v\n", err)
			}

			planetList[planet.Name] = planet.Climate
		}(i)
	}

	wg.Wait()
	fmt.Printf("Planets: %v\n", planetList)
}

(Click here to see the full example code)

Similar to the previous section, it loops a few times and makes calls to get planets inside separate goroutines. The main difference is that instead of using channels it uses a sync.WaitGroup to signal when the goroutines are done.

For each new goroutine that makes a call, it also invokes wg.Add(1), which very sensibly increments the wait group's internal counter. It also defers a call to wg.Done(), which decrements that same counter when the anonymous function returns. At the end of issuing all of the calls, it calls wg.Wait(), which blocks until the counter has reached zero.

This waitgroup is an implementation of a counting semaphore.

The previous code example will build (and probably run) just fine, but it still has a problem. Each of the goroutines spun up by the loop access a shared planetList map. Maps are a reference type, so if two goroutines try to access that same location in memory at the same time... well... 💥 explosion.

Don't believe me? Build the binary for the example with the go build -race flag. When you run the binary, the race detector will helpfully point out this flaw.

To fix the issue, check the next example that includes a pointer to a sync.Mutex. Mutexes can be used to protect shared resources, in this case the map, by locking and unlocking them. In this case, when one goroutine has a lock on the map, the others have to wait their turn.

func main() {
	var wg sync.WaitGroup
	planetList := make(map[string]string)
	mutex := &sync.Mutex{}

	for i := 1; i < 4; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			planet, err := GetPlanet(id)
			if err != nil {
				log.Fatalf("Err getting planet: %v\n", err)
			}

			mutex.Lock()
			planetList[planet.Name] = planet.Climate
			mutex.Unlock()
		}(i)
	}

	wg.Wait()
	fmt.Printf("Planets: %v\n", planetList)
}

(Click here to see the full example code)

Building this updated example with go build -race shows that you're totally in the clear when you run the binary.

Fin

I'm just scratching the surface as I build my understanding of concurrency. I hope some of the examples of what I'm currently thinking about and links to more expert resources are interesting to you, too! Want to talk about it? You can reach me on twitter at @rtravitz!