Credit-based Flow Control

pieterhpieterh wrote on 19 Jul 2011 14:58

loop.png

Using network buffers - such as 0MQ's queues - has a disadvantage when you are sending data to many readers. Your writer will block when the buffer is full. To avoid this, you can use non-blocking writes, and poll for sockets that are 'ready for writing'. But here's an alternative that dispenses with high-water marks and blocking writes. It's called "credit-based flow control".

The example below runs on 0MQ using the CZMQ API. The principles are portable to any messaging system. I'll let the code speak for itself.

//
//  Credit based flow control example
//
//  We start some clients that talk to a single server via a DEALER
//  to ROUTER setup. Clients say hello to the server and then start
//  to receive random data. The server sends as fast as it can, but
//  only within credit window created by client.
//
#include "czmq.h"

#define NBR_CLIENTS     1

//  The TRANSIT_TOTAL size defines the total data in transit,
//  covering 0MQ send and recv queues, TCP send and recv buffers,
//  and packets in flight on the network. The client starts by
//  sending TRANSIT_TOTAL credit to the server, and thereafter
//  sends TRANSIT_SLICE credit after receiving TRANSIT_SLICE bytes.

#define TRANSIT_TOTAL   1024 * 1024
#define TRANSIT_SLICE   TRANSIT_TOTAL / 4

//  We assert that the flow-control mechanism works by setting a
//  HWM on the server send queue, and sequencing messages. If the
//  queue hits HWM for any client, 0MQ will drop messages, as this
//  is the exception strategy for ROUTER sockets. The client can
//  detect this and abort.
//
//  Difficulty: 0MQ counts messages, not bytes. So HWM is not an
//  accurate measure. To solve this we batch small messages, and
//  fragment larger messages into blocks of FRAGMENT_SIZE octets.
//
//  For the example we simply generate FRAGMENT_SIZE messages.
//  In a more cynical test we would batch and fragment on sending.
//  But, once flow control works, we don't need the HWM at all.

#define FRAGMENT_SIZE   65536

//  Knowing the TRANSIT_TOTAL and the FRAGMENT_SIZE, we can set the
//  HWM to be (TRANSIT_TOTAL / FRAGMENT_SIZE).

#define SERVER_HWM      TRANSIT_TOTAL / FRAGMENT_SIZE

//  -------------------------------------------------------------------
//  Put a long integer to a network buffer

#define PUT_LONG(buffer, value) { \
    (buffer) [0] = (byte) (((value) >> 24) & 255); \
    (buffer) [1] = (byte) (((value) >> 16) & 255); \
    (buffer) [2] = (byte) (((value) >> 8)  & 255); \
    (buffer) [3] = (byte) (((value))       & 255); \
    }

//  Put a long long integer to the network buffer
#define PUT_LLONG(buffer, value) { \
    (buffer) [0] = (byte) (((value) >> 56) & 255); \
    (buffer) [1] = (byte) (((value) >> 48) & 255); \
    (buffer) [2] = (byte) (((value) >> 40) & 255); \
    (buffer) [3] = (byte) (((value) >> 32) & 255); \
    (buffer) [4] = (byte) (((value) >> 24) & 255); \
    (buffer) [5] = (byte) (((value) >> 16) & 255); \
    (buffer) [6] = (byte) (((value) >> 8)  & 255); \
    (buffer) [7] = (byte) (((value))       & 255); \
    }

//  Get a long integer from a network buffer

#define GET_LONG(buffer) \
      ((buffer) [0] << 24) \
    + ((buffer) [1] << 16) \
    + ((buffer) [2] << 8)  \
    +  (buffer) [3]

//  Get a long long integer from the network buffer
#define GET_LLONG(buffer) \
      ((int64_t) ((buffer) [0]) << 56) \
    + ((int64_t) ((buffer) [1]) << 48) \
    + ((int64_t) ((buffer) [2]) << 40) \
    + ((int64_t) ((buffer) [3]) << 32) \
    + ((int64_t) ((buffer) [4]) << 24) \
    + ((int64_t) ((buffer) [5]) << 16) \
    + ((int64_t) ((buffer) [6]) << 8) \
    +  (int64_t) ((buffer) [7])

//  -------------------------------------------------------------------
//  Client task

static void *
client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *dealer = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (dealer, "tcp://127.0.0.1:10001");

    //  Start by sending TRANSIT_TOTAL credit to server
    zframe_t *frame = zframe_new (NULL, 4);
    PUT_LONG (zframe_data (frame), TRANSIT_TOTAL);
    zframe_send (&frame, dealer, 0);

    //  Now consume and verify incoming messages and refresh
    //  credit asynchronously as needed
    int64_t expected_seq = 0;
    int received = 0;

    while (TRUE) {
        zmsg_t *msg = zmsg_recv (dealer);
        if (!msg)
            break;

        //  Message has two frames, sequence number and body
        zframe_t *sequence = zmsg_pop (msg);
        zframe_t *content = zmsg_pop (msg);
        assert (content);
        int64_t current_seq = GET_LLONG (zframe_data (sequence));
        if (current_seq != expected_seq) {
            printf ("E: server dropped %d messages, exit (%d/%d)\n",
                (int) (current_seq - expected_seq),
                (int) current_seq, (int) expected_seq);
            exit (1);
        }
        expected_seq++;

        //  Count received data, send top-up credit if needed
        received += zframe_size (content);
        if (received > TRANSIT_SLICE) {
            received -= TRANSIT_SLICE;
            zframe_t *frame = zframe_new (NULL, 4);
            PUT_LONG (zframe_data (frame), TRANSIT_SLICE);
            zframe_send (&frame, dealer, 0);
        }
        zframe_destroy (&sequence);
        zframe_destroy (&content);
        zmsg_destroy (&msg);

        //  Sleep for some random interval up to 100 msecs
        zclock_sleep (randof (10));
    }
    zctx_destroy (&ctx);
    return NULL;
}

//  -------------------------------------------------------------------
//  Server task

//  Clients are represented by this data structure
typedef struct {
    zframe_t *identity;
    int credit;
    int64_t sequence;
} client_t;

static void *
server_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *router = zsocket_new (ctx, ZMQ_ROUTER);
    zsockopt_set_hwm (router, SERVER_HWM);
    zsocket_bind (router, "tcp://*:10001");

    //  We'll hold the clients on a simple list
    zlist_t *clients = zlist_new ();

    //  We're purely driven by input events
    while (1) {
        zmsg_t *msg = zmsg_recv (router);
        if (!msg)
            break;

        //  PROCESS CLIENT CREDITS
        //  -------------------------------------------------------
        //  Only message we accept from clients is a credit message
        //  - frame data is amount of credit as 4-byte net longint

        zframe_t *client_frame = zmsg_pop (msg);
        zframe_t *credit_frame = zmsg_pop (msg);
        assert (credit_frame);
        int credit = GET_LONG (zframe_data (credit_frame));
        zframe_destroy (&credit_frame);

        //  Look for pre-existing client with this identity
        client_t *client = (client_t *) zlist_first (clients);
        while (client) {
            if (zframe_eq (client->identity, client_frame))
                break;
            client = (client_t *) zlist_next (clients);
        }
        //  If this client is new, create an object and save it
        if (client == NULL) {
            client = (client_t *) zmalloc (sizeof (client_t));
            client->identity = client_frame;
            zlist_append (clients, client);
        }
        else
            zframe_destroy (&client_frame);

        //  Accumulate credit for this client
        client->credit += credit;
        zmsg_destroy (&msg);

        //  DISPATCH TO CLIENTS
        //  -------------------------------------------------------
        //  We now stream data to all clients with available credit
        //  until their credit is used up. We then wait for clients
        //  to send us new credit.

        //  Process entire client list in turn
        client = (client_t *) zlist_first (clients);
        while (client) {
            while (client->credit >= FRAGMENT_SIZE) {
                int msgsize = FRAGMENT_SIZE + randof (1000) - randof (1000);
                zframe_t *sequence = zframe_new (NULL, 8);
                zframe_t *content = zframe_new (NULL, msgsize);
                PUT_LLONG (zframe_data (sequence), client->sequence);
                client->sequence++;

                //  Send fragment of data to the client
                zframe_send (&client->identity, router,
                             ZFRAME_MORE + ZFRAME_REUSE);
                zframe_send (&sequence, router, ZFRAME_MORE);
                zframe_send (&content, router, 0);

                //  Discount credit
                client->credit -= msgsize;
            }
            client = (client_t *) zlist_next (clients);
        }
    }
    zctx_destroy (&ctx);
    return NULL;
}

int main (void)
{
    //  Create threads
    zctx_t *ctx = zctx_new ();
    printf ("I: starting server...\n");
    zthread_new (server_task, NULL);

    printf ("I: starting %d clients...\n", NBR_CLIENTS);
    int client_nbr;
    for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
        zthread_new (client_task, NULL);

    while (!zctx_interrupted)
        sleep (1);

    zctx_destroy (&ctx);
    return 0;
}

Comments

Add a New Comment
Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License