The PUB-SUB Hacks: Census

nav_first.pngFirst: blog:1
Elegant Little Pieces
Edited: 28 Sep 2013 18:15 by: pieterh
Comments: 0
Tags: unprotocols

nav_prev.pngPrevious: blog:37
The PUB-SUB Hacks: Meerkat
Edited: 28 Sep 2013 09:59 by: pieterh
Comments: 1
Tags: zeromq

nav_last.pngLast: blog:80
My 4-year old is a better programmer than you (or me)
Edited: 17 Nov 2014 23:23 by: pieterh
Comments: 0
Tags:

nav_next.pngNext: blog:39
Securing ZeroMQ: draft ZMTP v3.0 Protocol
Edited: 28 Sep 2013 18:16 by: pieterh
Comments: 0
Tags: security zeromq

pieterhpieterh wrote on 26 Mar 2013 19:28

1_logo.png

In the first article on PUB-SUB hacks, we looked at the Meerkat pattern. In this second article I'll walk through a pattern—Census—that's loosely inspired by the "Surveyor" socket type from the defunct Crossroads fork of ZeroMQ. It's a fairly simple pattern. The code for this article is in a GitHub gist.

The Census Pattern

The Census Pattern is an evolution of Meerkat that solves the problem of monitoring or querying a set of nodes. The question part looks like a PUB-SUB broadcast and the answer part is similar to a PUSH-PULL sink in that it's a many-to-one flow. You can often simply hard-code the question so it's implied. For instance instead of asking a set of nodes, "who is doing the least work right now?" (to know who to send new work to), nodes could broadcast their availability without being asked.

The advantage of explicitly asking the question is, of course, that you can do more sophisticated group queries. For instance, you could send a search pattern to a set of engines, and get back the first reply or all replies. So Census is like group request-reply, where the replies can be processed according to the use-case.

Traditionally, we'd design Census using a PUB-SUB pattern for the question, and a PUSH-PULL pattern for the replies. In the old days this the advice we gave ØMQ developers: break a complex pattern into separate flows, use a specific socket type for each one.

However it turns out that multiple sets of sockets gets complex and confusing. It's hard to synchronize things when you have multiple independent paths. It's hard to imagine a security model working over mixed socket flows. Using a single socket pair is, it turns out, simpler. And simpler is always better. So that's what we'll aim for: a single socket flow for group request-reply.

Don't confuse "censuses" (if that is a proper word) with "surveys". A census is what you do when you ask your kids, "who wants burgers?" A survey is what you do when you stop random people at the 11th Annual Vegan Life Festival and ask innocently, "who wants burgers?" (The answer might astonish you. Or not. I've never tried, but YouTubing ridiculous surveys conducted on self-selected samples suddenly seems quite fun. Next up, we survey the Papal convention for their opinions on whether burger makes a valid pizza topping, and then we extrapolate their answers to the whole world population.)

Popes aside, here are some ground rules for conducting a census:

  • We generally know in advance how many people we're speaking to. People may come and go randomly but unlike a survey, our goal is to count the heads we can see, not the ones we can't.
  • We expect either an answer, or silence, within a certain time. On a computer network as in the real world, there's no such thing as perfection.
  • Based on the results, we take some decision, which may be to repeat the question, ask a different question, to jump to a bogus forgone conclusion, etc.

So parameters for a census are: the question we ask, the allowed answers, the time we allow for answers, and the total size of our target population. That last piece is, of course, where Meerkat comes in since it lets us conduct a census of a dynamic population that may be leaving and joining the network randomly.

Census Over XPUB-XSUB

Clearly the PUB-SUB pattern is a good place to start with a census since we want to broadcast the question to everyone. We do have to use XPUB and XSUB since they give us extra superpowers we need. Two superpowers, in fact:

  • We can send and receive subscription messages, which lets us do the Meerkat part.
  • XSUB sockets can talk back to XPUB sockets, which lets us do the Census part.

John Muehlhausen's libzmq d32e39 commit from 7 January 2013 is worth studying. If I taught computer science, I'd spend a few hours just boring my poor students with this one. His 14-line patch hacked XPUB and XSUB into becoming a two-way highway for messages. It's so simple and so very beautiful. I like the evolution of PUB and SUB from a one-way purist design (from the original one-way PGM-style financial data distribution) into more tactile and pragmatic two-way sockets.

The patch is a minimal change, evolution in action. A subscription message starts with 0x01. An unsubscription message starts with 0x00. We discard any other message, right? That's how XPUB and XSUB were designed. But hang on, no, hang on… Let's not throw other messages away, let's pass them on up instead. Suddenly the gate opens to subscribers sending stuff back to their publishers.

So, literally (and I mean this in the non-figurative sense), you can send a "normal" message to an XSUB socket, and you can receive "normal" messages off XPUB sockets. XPUB works like a sink, and will fair-queue incoming messages from its subscribers, while XSUB works like a copier, and will copy the message to each XPUB it's connected to. You don't need any special socket options for this, it's how the sockets work out of the box. If you don't want the functionality, just discard "normal" messages in code that reads from XPUB sockets.

Figure 1 - The Census Pattern

fig1.png

Enough ado already, let's take this pie out of the oven and see how it tastes. Here's my first attempt at the Census pattern:

// The Census Pattern
// Model 1, over XPUB-XSUB

#include "czmq.h"

static void
counter_task (void *args, zctx_t *ctx, void *pipe)
{
void *counter = zsocket_new (ctx, ZMQ_XPUB);
zsocket_set_xpub_verbose (counter, 1);
zsocket_bind (counter, "tcp://*:6001");

// The counter task is broken into two steps. First it allows
// all targets to get ready and raise their hands, using the
// Meerkat pattern. Then it sends out its census question and
// allows all targets time to reply:

// Parameters for the census
int count_msec = 250; // Msecs to settle down
int think_msec = 250; // Msecs for responses

// Calling thread tells us the population size
char *population = zstr_recv (pipe);

// All activity happens on our counter socket
zmq_pollitem_t items [] = { { counter, 0, ZMQ_POLLIN, 0 } };
byte meerkat [] = { 1, 'M', 'e', 'e', 'r', 'k', 'a', 't' };

// Both steps are zmq_poll loops which exit either when we
// get the expected number of responses, or we time-out. In
// the first step we count only Meerkat subscriptions:
int headcount = 0; // Known target size
int64_t timer_end = zclock_time () + count_msec;
int still_waiting = atoi (population);
while (still_waiting) {
int64_t time_left = timer_end - zclock_time ();
if (time_left <= 0)
break; // We're done here
int rc = zmq_poll (items, 1, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

if (items [0].revents & ZMQ_POLLIN) {
zframe_t *frame = zframe_recv (counter);
if (!frame)
break; // Interrupted
if (zframe_size (frame) == sizeof (meerkat)
&& memcmp (zframe_data (frame), meerkat,
sizeof (meerkat)) == 0) {
still_waiting--;
headcount++;
}
zframe_destroy (&frame);
}
}
// Now we've got our target population and we know they're
// subscribed, we send out the census question:
zstr_send (counter, "Who wants pizza?");

// In the second poll loop, we wait for valid answers to our
// census question. We might still receive subscription
// messages so we have to discount those:
int positives = 0; // How many said "yes"
timer_end = zclock_time () + think_msec;
still_waiting = headcount;
while (still_waiting) {
int64_t time_left = timer_end - zclock_time ();
if (time_left <= 0)
break; // We're done here
int rc = zmq_poll (items, 1, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

if (items [0].revents & ZMQ_POLLIN) {
zframe_t *frame = zframe_recv (counter);
if (!frame)
break; // Interrupted
byte *data = zframe_data (frame);
// Ignore any subscriptions we might still get
if (data [0] > 1) {
if (streq ((char *) data, "Yes"))
positives++;
still_waiting--;
}
zframe_destroy (&frame);
}
}
printf ("Out of %d people, %d want pizza\n", headcount, positives);
zstr_send (pipe, "DONE");
}

// The target task starts by doing a Meerkat subscription, and then
// subscribes to everything with a zero-sized subscription message.
// It waits for the census question and answers Yes or No randomly:

static void
target_task (void *args, zctx_t *ctx, void *pipe)
{
void *target = zsocket_new (ctx, ZMQ_XSUB);
zsocket_connect (target, "tcp://localhost:6001");

// Tell publisher we're here
byte meerkat [] = { 1, 'M', 'e', 'e', 'r', 'k', 'a', 't' };
zmq_send (target, &meerkat, sizeof (meerkat), 0);
// Subscribe to everything as well (empty subscription)
zmq_send (target, &meerkat, 1, 0);

char *question = zstr_recv (target);
char *answer = randof (2) == 0? "Yes": "No";
printf ("%s %s\n", question, answer);
free (question);
zstr_send (target, answer);
}

// The main task starts a counter task and a set of target tasks:

int main (void)
{
zctx_t *ctx = zctx_new ();

// Size of target population
srand ((unsigned) time (NULL));
int population = randof (10) + 1;

// Start counter task
void *pipe = zthread_fork (ctx, counter_task, NULL);
zstr_send (pipe, "%d", population);

// Start target population
while (population--)
zthread_fork (ctx, target_task, NULL);

// Wait for census to complete
free (zstr_recv (pipe));

zctx_destroy (&ctx);
return 0;
}


census1.c: Census Pattern over XSUB-XPUB

It's not huge, 150 lines of code in C. The counter task has two steps. If all nodes are up and running, the whole thing runs with no delay. If there's a node broken or too busy to answer, each step will wait for a while before timing out. A census will still be valid if there are missing nodes. In a realistic implementation we could continue to ask more questions, and not need to wait for nodes to join again each time.

Census Over ROUTER-DEALER

In the Big Black Book of Mad Science (originally it was a kind of light green but multiple incendiary near-misses gave it a charred exterior, and the general consensus was that "The Big Kind of Light Green Book" didn't have quite the same ring anyhow), rule number three is: "If at first you succeed, try again with more gunpowder". (Rules one and two are, since you're asking, "Bald is the new Evil", and "If you laugh loudly, you can get away with almost anything". Protective eye-wear only merits rule number 24. Rule number five says, enigmatically, "If you have to ask what rule #5 is, you can't afford it.")

So, more gunpowder! XPUB-XSUB is neat, but how about ROUTER-DEALER? After all, the Census pattern requires a two-way many-to-one dialog, and ROUTER-DEALER are pretty good at that. As an experiment, let's rewrite the same Census example using ROUTER-DEALER, and see whether it's simpler or more complex.

Figure 2 - Census with More Gunpowder

fig2.png

Here's the code:

// The Census Pattern
// Model 2, over ROUTER-DEALER

#include "czmq.h"

static void
counter_task (void *args, zctx_t *ctx, void *pipe)
{
void *counter = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (counter, "tcp://*:6001");

// Parameters for the census
int census_msec = 250; // Msecs to settle down

// Calling thread tells us the population size
char *population = zstr_recv (pipe);

// All activity happens on our counter socket
zmq_pollitem_t items [] = { { counter, 0, ZMQ_POLLIN, 0 } };

int headcount = 0; // Known target size
int positives = 0; // How many said "yes"

int64_t timer_end = zclock_time () + census_msec;
int still_waiting = atoi (population);
while (still_waiting) {
int64_t time_left = timer_end - zclock_time ();
if (time_left <= 0)
break; // We're done here
int rc = zmq_poll (items, 1, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

if (items [0].revents & ZMQ_POLLIN) {
zframe_t *address = zframe_recv (counter);
char *message = zstr_recv (counter);
if (streq (message, "Hello")) {
headcount++;
zframe_send (&address, counter, ZFRAME_MORE);
zstr_send (counter, "Who wants pizza?");
}
else
if (streq (message, "Yes"))
positives++;

zframe_destroy (&address);
free (message);
}
}
printf ("Out of %d people, %d want pizza\n", headcount, positives);
zstr_send (pipe, "DONE");
}

// The target task starts by saying Hello, then it waits for the
// census question and answers Yes or No randomly:

static void
target_task (void *args, zctx_t *ctx, void *pipe)
{
void *subscriber = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (subscriber, "tcp://localhost:6001");

zstr_send (subscriber, "Hello");
char *question = zstr_recv (subscriber);
char *answer = randof (2) == 0? "Yes": "No";
printf ("%s %s\n", question, answer);
free (question);
zstr_send (subscriber, answer);
}

// The main task starts a counter task and a set of target tasks:

int main (void)
{
zctx_t *ctx = zctx_new ();

// Size of target population
srand ((unsigned) time (NULL));
int population = randof (10) + 1;

// Start counter task
void *pipe = zthread_fork (ctx, counter_task, NULL);
zstr_send (pipe, "%d", population);

// Start target population
while (population--)
zthread_fork (ctx, target_task, NULL);

// Wait for census to complete
free (zstr_recv (pipe));

zctx_destroy (&ctx);
return 0;
}


census2.c: Census Pattern over ROUTER-DEALER

Quite shockingly, this worked first time and is only two-thirds the size of the XPUB-XSUB version. The code is quite a lot simpler. This teaches us a few things:

  • ROUTER-DEALER is a powerful tool.
  • Asynchronous beats synchronous. The main cost in the XPUB-XSUB model is the getting everyone to pay attention and listen to the question at the same time. This doubles the size of the publisher task and doubles the worst-case time for the census.
  • Always try again, with more gunpowder.

The next and final article in this little series of PUB-SUB Hacks will cover the Repeater pattern, which is an "any-to-all pub-sub for Very Large Networks" pattern.

Comments

Add a New Comment
Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License