The PUB-SUB Hacks: Meerkat

pieterhpieterh wrote on 26 Mar 2013 18:44

1_logo.png

While the ØMQ pub-sub pattern is well-known for being a one-way pattern, it does in fact support a two-way flow of information. SUB sockets (or more accurately, XSUB sockets) can send messages to XPUB sockets. This week, I'll post a series of short articles that cover different patterns—Meerkat, Census, and Repeater—we can build using this functionality. As usual, I'll demonstrate each pattern with code you can download and run.

The Meerkat Pattern

One of the common questions from ØMQ developers (until they learn that there's no good answer and retreat into a kind of Stockholmesque "if it hurts it must be for our own good" acceptance) is "how can a publisher know when subscribers are present?" However as I've written elsewhere, pain is not a good sign.

It is true that for certain use cases, the answer is, "you don't want to know". It's true that for very large scale publishers, the raw cost of tracking individual subscribers in real time is not worth the effort. The BBC doesn't wait until everyone's ready with their telly before starting Blue Peter. So, the argument goes, why should we?

But I'm suspicious of people who tell me what I should think. Particularly since, as a writer, propaganda is my job. And I know just how much rubbish people say when they're being especially serious. But kidding aside, the question is valid, and the "no, you don't want to do this" answer seems bogus and somewhat lazy. We know that publishers can benefit greatly from tracking individual subscribers. This was after all the major improvement in version 3 of ØMQ: much faster PUB-SUB thanks to "upstreaming", where subscribers actually tell publishers what data they want.

If it's good for XPUB and XSUB sockets, why not for applications too? This was the question that innocently entered my mind one spring morning in Brussels, as we watched yet another snowstorm unfold. Innocence is all relative, of course, as they say in Rome. The question hooked into some unfinished thoughts and formed a flash mob of irritation that forced me to stop working briefly on more interesting topics. The Meerkat Pattern is what came out.

meerkats.jpg

This is one of the real pleasures of using a technology: to twist and distort it into shapes its original designers never thought of, and see what comes out. So let's distort and abuse the XPUB-XSUB upstreaming functionality to answer that question and give publishers a way to know what subscribers are around.

I once had a friend and mentor, the larger-than-life Leif Svalgaard, who would work secretly on some amazing code and then show the finished results to general astonishment. When we asked him how he did it, he invariably answered with a huge Santa Clause grin, palms up innocently, saying, "it's magic!" Once Leif wrote a tiny but working multitasking system for DOS over the weekend, just to disprove Microsoft's claims that OS/2 (yes, this was some time ago) needed 8 Megabytes of memory "because it did multitasking". As he started a compilation in one window, and edited a file in another, I asked him, "how did you do this?" "Ah, it's magic!" he answered.

Incidentally, that tiny multitasking system (1,000 lines of x86 assembly) inspired my multithreading SMT kernel in 1995, which became the heart of our server technology and then OpenAMQ, which led the way to ZeroMQ. So it goes in the history of software, a chain of inspiration and culture leading back to the first time primitive man told his wife, "just going down to the cave to paint some pr0… ugh… hunting scenes, honey. BRB!"

Figure 1 - The Meerkat Pattern

fig1.png

Step 1: Counting Subscriptions

What if a publisher (suitably wearing an "X" on its torso to mark it with XPUB superpowers) simply counted the number of subscriptions it received? Perhaps surprisingly, this works for all values of N on the condition that N is equal to one. To say this in English, an XPUB tracks subscriptions and only tells you of the first one. So, on to attempt 2.

Step 2: Using the Verbose Option

A few months ago we added a neat little option (ZMQ_XPUB_VERBOSE) to XPUB sockets that disables its filtering of duplicate subscriptions. This now works for any number of subscribers. We use this as follows:

void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_set_xpub_verbose (publisher, 1);
zsocket_bind (publisher, "tcp://*:6001");


fragments/xpub-verbose-option.c: Listing 1

However we still have a problem. Subscribers may subscribe any number of times. We need some devious trick to distinguish different kinds of subscriptions.

Step 3: Sending 'Special' Subscriptions

My first idea was to use the empty subscription, but of course this doesn't play well with real use. Let's say I have two subscribers that subscribe to a series of topics:

  • A subscribes to "USD", "JPY", and "EUR"
  • B subscribes to "JPY" and "KRW"

How do I know how many subscribers I have?

My answer, which is not very beautiful but works, is to send a "special" subscription that doesn't match any real data but which tells my application what it needs to know:

  • A subscribes to "USD", "JPY", "EUR", and "Meerkat"
  • B subscribes to "JPY", "KRW", and "Meerkat"

So this brings us to an example of Meerkat in action:

// The Meerkat Pattern
//
// In which we address the slow subscriber problem by asking for
// a show of hands before we start publishing.

#include "czmq.h"

static void
publisher_task (void *args, zctx_t *ctx, void *pipe)
{
zctx_set_linger (ctx, 1000);
void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_set_xpub_verbose (publisher, 1);
zsocket_bind (publisher, "tcp://*:6001");

// Calling thread tells us the population size
char *population = zstr_recv (pipe);
int still_waiting = atoi (population);
byte meerkat [] = { 1, 'M', 'e', 'e', 'r', 'k', 'a', 't' };

while (still_waiting) {
zframe_t *frame = zframe_recv (publisher);
if (!frame)
break; // Interrupted
if (zframe_size (frame) == sizeof (meerkat)
&& memcmp (zframe_data (frame), meerkat, sizeof (meerkat)) == 0)
still_waiting--;
// Dump the frame contents for the benefit of the reader
zframe_print (frame, NULL);
zframe_destroy (&frame);
}
// Now broadcast our message
zstr_send (publisher, "Hello, World!");
zclock_sleep (250);
zstr_send (pipe, "DONE");
}

// The subscriber sends a "Meerkat" subscription and then
// any other subscriptions it wants:

static void
subscriber_task (void *args, zctx_t *ctx, void *pipe)
{
void *subscriber = zsocket_new (ctx, ZMQ_XSUB);
zsocket_connect (subscriber, "tcp://localhost:6001");

// Tell publisher we're here
byte meerkat [] = { 1, 'M', 'e', 'e', 'r', 'k', 'a', 't' };
zmq_send (subscriber, &meerkat, sizeof (meerkat), 0);
// Subscribe to everything as well (empty subscription)
zmq_send (subscriber, &meerkat, 1, 0);

char *hello = zstr_recv (subscriber);
puts (hello);
free (hello);
}

// The main task starts publisher task and then the subscribers:

int main (void)
{
zctx_t *ctx = zctx_new ();

// Size of target population
srand ((unsigned) time (NULL));
int population = randof (10) + 1;

// Start publisher task
void *pipe = zthread_fork (ctx, publisher_task, NULL);
zstr_send (pipe, "%d", population);

// Start target population
while (population--)
zthread_fork (ctx, subscriber_task, NULL);

// Wait for publisher to complete
free (zstr_recv (pipe));

zctx_destroy (&ctx);
return 0;
}


meerkat.c: Meerkat Pattern

Get the code.

When you run this code you'll see it starts a random number of subscribers, waits until they all wake up, and then sends "Hello, World" to everyone.

Step 4: Ridiculous Optimization

If you look a little closer at the code you'll see a possible optimization. Instead of sending a subscription message to signal presence, how about an unsubscription message?

It's a great idea except (at time of writing), XPUB sockets don't verbosely send you unsubscribe messages. There are reasons for this (there are always reasons, he said darkly, to no-one in particular) but it's minor. Perfection is, sometimes, just a waste of time, and sending subscribe messages seems fine.

Step 5: Robustness

The Meerkat example will occasionally fail in the real world because it doesn't distinguish between a subscriber saying "I'm here" for the first time from one who comes back several times. We're basically counting the heads that pop-up, not the bodies that are really present on the field. The same head can pop-up over and over, and like idiots we'll count it multiple times.

There's an answer—there always is—but it makes things more complex. Each subscriber has to identify itself uniquely and the server has to track this. Subscribers could for instance send a UUID as part of their "special" subscription. The server would track these in a hash table, and count only new entries.

It's the kind of boring grunt work we invented ØMQ to get away from, so I'm not going to feed you an example. Let's just agree that making things more complex is easy. My next article will cover the Census pattern, which extends Meerkat into a group query pattern.

Comments

Add a New Comment
Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License