Revisting ZeroMQ Multipart

pieterhpieterh wrote on 03 Feb 2015 09:57

data.png

Around FOSDEM 2015, a few core ZeroMQ maintainers decided to experiment with a simpler messaging API, and decided it'd be profitable to explore rethinking multipart messages from ZeroMQ. In this article I'll explain the why's and how's of this potential shift.

Don't Panic

We don't remove functionality that people are using. The existing multipart functionality is defined by ZeroMQ RFCs 23 (ZMTP) and 28 (the request-reply pattern). There is no plan to stop supporting these RFCs, nor to remove the multipart API contracts. Our strategy for shifting ZeroMQ's functionality is to develop new contracts over time, and when these are stable and proven, deprecate old ones. This happens over many years, with new code existing in parallel to old code.

So the focus of this experiment are mainly in new code: new language bindings, new protocols, and new stacks. The overall goal, as I'll explain, is to make things simpler, and easier to understand.

What's the Problem?

Multipart messages aim to solve a number of problems, and in turn create a number of problems. At the core of our shift is the understanding that the costs now appear to outweigh the benefits.

First, let's collect the problems that multipart solves:

  • It allows zero-copy for high-volume pub-sub data, where the application sends a routing key, followed by a body. With one part, the body must be copied into a buffer. With two parts, the body can be sent directly from an existing buffer (thus, no copying).
  • It provides a mechanism for multi-hop request-reply, where routing identifiers are pushed to an "envelope", and popped on the return path.
  • It provides applications with a simple serialization mechanism, either to avoid using additional libraries, or so that brokers can add and remove information to messages they are forwarding.

Now, let's list the main problems that we've seen with the multipart model in ZeroMQ:

  • It is confusing, as a "message" means both a part, and a list of parts. We've had to add new terms like "frame" to compensate.
  • The multipart request-reply envelope is complex to understand, and yet any realistic application has to use it. This creates a real learning cost.
  • ZeroMQ servers exploit the request-reply envelope to do server-client routing (using ROUTER sockets). This always confuses new users.
  • ZeroMQ implementations cannot realistically make sockets threadsafe, as they must serialize multiple send and recv calls to a single thread. Thus, multipart messages have stopped us making threadsafe sockets.
  • Multipart messages are counter-intuitive; the "send" method does not actually send anything; it queues the message until the final part is sent. New users often try to use multipart to send large files. This does not work.
  • The two main sockets that exploit the multipart mechanism (REQ and REP) cannot recover from failures, and rarely used by real applications.
  • Supporting multipart messages has raised complexity and performance in all bindings and tools. It is significantly harder and more work to support a list of messages than to support a single frame.
  • Multipart data is incompatible with other layers that treat data as a stream of bytes or blobs. For example, the Go I/O interface expects a transport to read and write blobs.

The Identity Problem

The ZMQ_IDENTITY socket setting is one of the more confused, and in my view dangerous, semantics in ZeroMQ. It allows a DEALER/REQ/REP peer to tell a ROUTER socket what value to use for its internal routing ID. "User-defined identities" arrive as a magical first frame that can be up to 255 bytes long.

It was a mistake to mix node identities with routing information. These are two separate layers (contracts) and mixing them creates a confused hybrid contract. What happens when two clients connect with the same identity? At what stage is the identity checked? How do we correlate such identities with application-level identities? How do we proxying traffic?

It has been clear for some time that:

  • We should stop using the term "identity" for server-side routing information (which is essentially the handle of a pipe).
  • We should delegate identity management (authentication and access control) to higher layers, where different decisions can be made depending on the case.
  • We should in fact hide the routing information in simple request-reply servers.

The Multihop Fallacy

The multipart request-reply envelope solves what appears to be a fallacy, a fake problem. That is: we want to be able to carry a single message across many hops, from originator to recipient, and then carry a reply back along the same path.

This scenario does not seem to happen in practice. There is a very good reason for this, stemming from the essential need of a distributed system to work in layers, and hide the internals of every layer from other layers.

Put this another way, the extended request-reply scenario assumes, or tries to enforce, a single contract that spans across many layers. And this is a failing pattern. Success needs specific contracts between each layer, and the flexibility to change these at need, without affecting other layers.

Take an example, the Majordomo protocol that takes client work requests, and delivers them to workers. This is practically the simplest kind of load balancing we can do.

In Majordomo we have two contracts. One defines the interaction between clients and the broker. The second defines the interaction between workers and the broker. These two layers are separate, and it is profitable to keep them so. It lets us evolve the worker contract (adding heartbeating, timeouts, recovery) and the client contract (e.g. adding scheduling and quotas) separately. It lets us experiment with multiple worker-broker contracts without affecting client applications.

Clearly we do carry payloads across both layers. However there is an intelligent piece of code switching between the two layers, and translating messages from one contract to another.

So it looks like multihop request-reply can be falsified as a theory, and thus the use case for request-reply envelopes can also be falsified.

A Better Framing Approach

In the last few years the ZeroMQ community has developed its own tool for framing, namely zproto. This generates a fast binary codec for a protocol, in arbitrary languages (Go, Clojure, Java, C, C# at this stage).

The advantage of zproto over framing is that you get a properly documented protocol with robust error checking at client and server side. For example, ZeroMQ RFC 7 (the Majordomo protocol), uses multipart framing for its protocol, which makes it expensive to implement the protocol, or evolve and improve it.

A Better Client-Server Pattern

While the standard pattern for client-server cases is DEALER-ROUTER, these sockets are clumsy and often confusing to newcomers. Their names are unclear. The ROUTER socket uses multipart messages for routing, which is semantically and technically confusing.

There is a much simpler, more obvious client-server pattern:

  • CLIENT and SERVER socket types that are separate from the request-reply pattern.
  • CLIENT can implement, immediately, necessary logic for heartbeating, and recovery from server restarts.
  • SERVER can implement client sessions, recovery from network failures, and disconnection of idle clients.

There are several possibilities for a SERVER socket design:

  • Just like ROUTER today, except the routing information is not provided as a multipart frame. Instead, it is a message property (it cannot be a socket property, if sockets are to be threadsafe).
  • A higher level socket that automatically manages client sessions. This means, identifying new connections and re-connections (when the same client reconnects on a different network socket), responding to heartbeats, and expiring idle clients. The API here would add a "session" object, corresponding to a single client session.

Similarly, CLIENT could be very similar to DEALER, or something more sophisticated that deals with the requirements we always have in client-server scenarios.

Heartbeating and Sessions

Looking at several years of building client-server stacks with ZeroMQ, a consistent pattern emerges for reliable session management. We build this today using DEALER-ROUTER and zproto for protocol commands. It could profitably be built into the CLIENT-SERVER sockets, and the ZMTP protocol:

  • The server must detect new client sessions. Ideally, a session identifies a CLIENT socket instance, with the same lifecycle. Today we use the routing ID, which means we cannot recover a client session after a network break.
  • The client must send heartbeats, when idle. The heartbeat interval is typically one or several seconds, depending on client-side power considerations. A heartbeat command ("PING") carries no data.
  • The server must repond to heartbeats, simply by sending a heartbeat reply ("PING-OK"), again carrying no data.
  • Clients must send at most a few PINGs, if they are not getting PING-OK replies. Otherwise they will fill up their outgoing pipes with a thousand PING commands.
  • Servers must expire clients that have sent nothing within a certain timeout (typically 10 to 30 seconds, depending on the network).

For session management, the client must send a unique ID that can be used as session ID. Today we use the routing ID for this. Some applications use the ZMQ_IDENTITY setting for this. Neither are ideal. The best solution is that a CLIENT socket automatically generate its own unique session ID, which cannot be over-ridden or used at any layer except this one.

The client sends its session ID when connecting to the server. The server holds known client sessions in a table. If the client connects with an existing session ID, the server attaches that new connection to the existing session. If not, it creates a new session. Sessions expire when they have no activity for 10-30 seconds (per heartbeating).

This session reconnection is invisible to the application. It sees, at worst, a brief lull in traffic, which then resumes.

Separately, the SERVER socket provides a routing ID for each message. This is an internal short integer that identifies the session. The SERVER does NOT expose the session ID. If applications want their own client identifiers, they build these themselves.

The SERVER's routing ID is a socket (and perhaps also message) property that the caller can get (after receiving a message) and set (before sending a message). It is trivial to write simple servers, which can just be a recv/send loop, like today's REP sockets. Unlike REP sockets, a SERVER socket has no state machine and can trivially recover from errors.

Why Threadsafe Sockets?

One of the unexpected costs of multipart messages was that sockets could not be threadsafe. It's obvious why: a socket has to serialize an unknown number of "send" commands from a single caller thread. If receiving and sending is an atomic operation, the socket can be threadsafe, else not.

Threadsafe sockets resolve a major, long-standing problem, of using ZeroMQ from languages that spawn many threads. This is such a common use case: the UI spawns a thread which wants to send a message. Today it must create a new socket, connect it, send its message, and destroy the socket. This is not fast.

Ideally, the new thread would grab the socket from shared memory, send to it, and die happily. Note that this is for a one-way message flow. If threads need a reply from their socket, they must use (today) a DEALER-ROUTER model, and (in our new model) a CLIENT-SERVER model.

There have been abortive attempts to make ZeroMQ sockets threadsafe. They did not, however, make the codebase simpler. Removing multipart messages (even if this is a compile time option), makes threadsafe sockets possible.

CZMQ Changes

As a worked example, let's see how this could change the CZMQ API:

  • Deprecation of the zmsg and zframe classes, and promotion of zchunk (or byte * + size_t) as the basic message type.
  • New zclient and zserver classes, which implement CLIENT and SERVER sockets, as described above. Initially these could be emulated over DEALER and ROUTER, later over real CLIENT and SERVER.
  • Deprecation of all functionality that supports identities and multipart.
  • A new "reply" semantic, for SERVER sockets, which replies to a previously received message. This can be implicit (reply to socket), if the socket is thread local, or explicit (reply to message or to message routing id) if the socket is shared between multiple threads.
  • Change of actors and pipes to use CLIENT-SERVER instead of PAIR-PAIR. This lets actors reply to caller requests, where each caller would get its own client socket to speak to the actor.

How This Would Happen

Clearly some of the goals we have — simplifying the internals of ZeroMQ stacks — are not compatible with existing applications.

We cannot break existing application code, and so future ZeroMQ stacks will need to continue supporting multipart data. Some options for doing this are:

  • To build a new socket types (client, server) that are threadsafe and that work on single parts only. This would leave the existing codebases largely untouched.
  • To fork the codebase. Not perhaps the nicest option.
  • To support both models, and allow newer applications to disable multipart, to get the eventual advantages.

Conclusions

It's unclear we can make these shifts successfully. However, if we can, then here are some potential outcomes:

  • The REQ-REP-DEALER-ROUTER socket types will disappear (be deprecated).
  • The ZMQ_IDENTITY concept will disappear.
  • New CLIENT and SERVER sockets will replace REQ-REP-DEALER-ROUTER.
  • Messages will be single opaque blobs of data.
  • All other names for messages (frames and parts) will be deprecated.
  • All "recv more" and "send more" semantics will be deprecated.
  • CLIENTs and SERVERs may eventually do heartbeating and session management automatically.
  • All socket types that no longer accept multipart messages can probably be made thread safe.

Comments and discussion welcome, please use zeromq-dev for this.

Comments

Add a New Comment
Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License