Original post

When Processes need to talk

Applications and services often need to be scalable. The user base might grow from 10 to 10,000, or the incoming number of requests might increase by some orders of magnitude. One approach to scaling is to use faster computers. The other one is to use more computers and distribute workload among them.

Another scenario: Sometimes you need to keep separate concerns completely separated. In other words, every distinct functionality shall run as a separate OS process: A database, a Web server, a process that implements your business logic, and so forth.

Message Queues help connecting these processes.

Message Queues in a nutshell

Message Queue systems (or short, MQ systems) provide a means to pass arbitrary messages between processes. Usually they provide some sorts of topologies, or communication patterns, each of which fits a particular communication purpose.

Some MQ systems also provide a brokerage service, which acts as a well-known contact point for finding a particular service, as well as for routing messages from sender to receiver. This can be convenient especially for large distributed systems. Small systems may do well without a broker and thus benefit from higher throughput, as brokerless MQ systems can be much faster.

nanomsg – the minimalist MQ framework

In order to look into some examples of inter-process communications, we will use [nanomsg][NMG], a lightweight messaging system that has a couple of benefits:

  • nanomsg does not need a broker, nor a server infrastructure. Lightweight as can be.
  • nanomsg is dead-easy to understand and to use. No bloated concepts and over-complicated implementations.
  • nanomsg provides a reasonable set of common communication topologies (also called “scalability protocols” in nanomsg terminology) out of the box. No need to reinvent the wheel over and over again.
  • A pure Go client is available (see below).
  • There is a ton of nanomsg implementations for other languages available, too. Want to connect your Go process with some other process written in C++, Java, Python, Rust, Ocaml, Erlang,…? Here you go!

Mangos: nanomsg in pure Go

Mangos is a Go implementation of nanomsg. It features an easy and intuitive API, as we will discover later when going through a first example.

How does nanomsg define communication patterns?

It does so by providing so-called “Scalability Protocols”. Each Scalability Protocol defines a particular communication pattern.

These protocols are currently defined:

  • Pair

    Pair

    Motto: Scale your application by breaking it in two pieces.

  • Request-Reply

    ReqRep

    Motto: Distribute workload among multiple stateless workers.

  • Publisher-Subscriber

    PubSub

    Motto: Broadcast messages to multiple destinations. Receivers can subscribe to specific topics.

  • Pipeline

    Pipeline

    Motto: Collect output from multiple nodes of one processing step and distribute it among the nodes of the next processing step.

  • Survey

    Survey

    Motto: Broadcast a survey and gather the responses. Wait for the replies for a certain time only.

  • Bus

    Bus

    Motto: Broadcast messages from any node to all other nodes.

The basic building block of a Scalability Protocol is a nanomsg Socket.

What is a socket?

Sockets in general are messaging endpoints, usually defined by

  • a transport mechanism,
  • an IP address, and
  • a port number.

Example:

tcp://192.168.0.42:45890

Sockets

nanomsg provides a couple of transport mechanisms:

  • In-process
  • Inter-process (but still on the same machine)
  • TCP
  • WebSockets

A process can provide a socket to other processes, as well as connect to a remote socket of another process.

When a process provides a socket to others, it “listens” on the socket (in Mangos lingo). When it connects to a remote socket, it “dials” this socket. (You can see this distinction later in our expample.)

Sockets in nanomsg have another interesting feature: They impelement a particular Scalability Protocol. That means, a nanomsg socket takes care of managing all internals of the protocol so that your code can more or less focus on sending and receiving messages.

A first example: PAIR

Let’s dive straight into our first example: A simple PAIR communication.

The PAIR protocol lets two processes send messages to each other. None of the two nodes has a particular role. Each one can send and receive messages to and from the other one.

Typical use case: To split up a large application into two smaller parts.

Outline

So what are we going to implement? In short, we want to have two processes running. One of them listens on a socket, the other one dials that socket. Once they are connected, they exchange a couple of messages.

Please enable JavaScript to view the animation.

You can get the full source code at github.

Use go get -d to ensure that the binary does not get installed into your $GOPATH/bin directory. Rather, use go build to generate a local binary that you then can run as ./messaging.

Installing Mangos

(Note: If you go get the messaging code, Mangos is already included via the vendor directory. In this case you do not need to install Mangos separately.)

Installing Mangos is as easy as entering

go get -u github.com/go-mangos/mangos

on the command line. To ensure everything has been installed correctly, you might want to run the tests. For this, enter:

go test $GOPATH/src/github.com/go-mangos/mangos/test

If everything is ok, we can move forward to creating a sample PAIR implementation.

Implementing a PAIR example

To run this example, get the code from github:

go get -d github.com/appliedgo/messaging

(The -d flag prevents Go from installing the binary in your $GOPATH/bin directory.) Then cd to the pair directory and run:

go build

Then open a second terminal and cd to the same directory. In the first terminal, enter

$ ./messaging 0 "tcp://localhost:54545"

and in the other one, type

$ ./messaging 1 "tcp://localhost:54545"

(Note that you can pick an arbitrary port number from the “Dynamic” range between 49,151 and 65,535 – they only need to be the same for both processes.)

If you started node 0 first, your output should look like this:

$ ./messaging 0 "tcp://localhost:45454"
2016/02/04 11:44:55 Node 0 sends message 0 from node 0.
2016/02/04 11:44:58 Node 0 received message 0 from node 1.
2016/02/04 11:44:58 Node 0 sends message 1 from node 0.
2016/02/04 11:44:58 Node 0 received message 1 from node 1.
2016/02/04 11:44:58 Node 0 sends message 2 from node 0.
2016/02/04 11:44:58 Node 0 received message 2 from node 1.
2016/02/04 11:44:58 Node 0: Done.

And node 1 should have procuded something like this:

$ ./messaging 1 "tcp://localhost:45454"
2016/02/04 11:44:58 Node 1 cannot listen on socket 'tcp://localhost:45454': listen tcp 127.0.0.1:45454: bind: address already in use
Trying to dial instead
2016/02/04 11:44:58 Node 1 sends message 0 from node 1.
2016/02/04 11:44:58 Node 1 received message 0 from node 0.
2016/02/04 11:44:58 Node 1 sends message 1 from node 1.
2016/02/04 11:44:58 Node 1 received message 1 from node 0.
2016/02/04 11:44:58 Node 1 sends message 2 from node 1.
2016/02/04 11:44:58 Node 1 received message 2 from node 0.
2016/02/04 11:44:58 Node 1: Done.

Exercise 1

Try ipc: instead of tcp:

Exercise 2

The loop in runNode may seem silly as it serializes sending and receiving for no good reason (other than trying to remain simple). Turn the loop into two goroutines that send and receive independently.

What’s next?

The PAIR protocol is the simplest one of the Scalability Protocols. The more complex ones are, not surprisingly, also the more interesting ones. In the next article we’ll explore the PubSub protocol, a common pattern for distributing information from one sender to multiple receivers.

Updates and errata

  • 2016-05-29 Fixed: Small glitch in the path of go test.
  • 2016-05-29 Updated: Mangos is now in the vendor dir. You can use go get github.com/appliedgo/messaging without go-getting Mangos first.
  • 2016-05-30 Fixed: Broken links and a small typo.
  • 2016-06-13 Fixed: go get was missing the -d flag.