Dispatching flood GET requests from web browsers with libevent, ØMQ(zeromq/zmq) and MessagePack.

This is a simple sample code which recieves request from web browser with libevent and dispatch messages with ØMQ(zeromq/zmq) and MessagePack.

  • When web server receives GET request, get paramter is converted to msgpack’s map and it is sent to a client.
  • Web server returns JSON. If get parameter callback' is specified, it returns JSONP of which function name is thecallback' parameter.
  • The client recieves msgpack’s map and print it to stdout. You can write some logics like logging instead printing.

Web server

#include 
#include 
#include 
#include 
#include 
#include 

#include 
#ifndef TAILQ_FOREACH
#define TAILQ_FOREACH(var, head, field) \
  for ((var) = ((head)->tqh_first); \
    (var); \
    (var) = ((var)->field.tqe_next))
#endif /* LIST_FOREACH */
#include 
#include 
#include 
#include 

#define PORT 8080

void
generic_handler(struct evhttp_request *req, void *zmq_sock)
{
  struct evkeyvalq args;
  char *jsonp_func = NULL;

  if (!req->uri) { return; }

  {
    char *uri = evhttp_decode_uri(req->uri);
    evhttp_parse_query(uri, &args);
    free(uri);
  }

  {
    struct evkeyval *get;
    msgpack_packer pk;
    msgpack_sbuffer sbuf;

    msgpack_sbuffer_init(&sbuf);
    msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
    {
      unsigned int cnt = 0;
      TAILQ_FOREACH(get, &args, next) {
        if (!strcmp(get->key, "callback")) {
          jsonp_func = get->value;
        } else {
          cnt++;
        }
      }
      msgpack_pack_map(&pk, cnt);
    }
    TAILQ_FOREACH(get, &args, next) {
      if (get->value != jsonp_func) {
        unsigned int l;
        l = strlen(get->key);
        msgpack_pack_raw(&pk, l);
        msgpack_pack_raw_body(&pk, get->key, l);
        l = strlen(get->value);
        msgpack_pack_raw(&pk, l);
        msgpack_pack_raw_body(&pk, get->value, l);
      }
    }
    {
      zmq_msg_t msg;
      if (!zmq_msg_init_size(&msg, sbuf.size)) {
        memcpy((void *)zmq_msg_data(&msg), sbuf.data, sbuf.size);
        zmq_send(zmq_sock, &msg, 0);
        zmq_msg_close(&msg);
      }
    }
    msgpack_sbuffer_destroy(&sbuf);
  }
  {
    struct evbuffer *res_buf;
    if (!(res_buf = evbuffer_new())) {
      err(1, "failed to create response buffer");
    }

    evhttp_add_header(req->output_headers,
      "Content-Type", "text/javascript; charset=UTF-8");
    evhttp_add_header(req->output_headers, "Connection", "close");

    if (jsonp_func) {
      char num_buf[16];
      unsigned int jsonp_func_len = strlen(jsonp_func);
      evbuffer_add(res_buf, jsonp_func, jsonp_func_len);
      evbuffer_add(res_buf, "({});", 5);
      snprintf(num_buf, 16, "%d", jsonp_func_len + 5);
      evhttp_add_header(req->output_headers, "Content-Length", num_buf);
    } else {
      evbuffer_add(res_buf, "{}", 2);
      evhttp_add_header(req->output_headers, "Content-Length", "2");
    }
    evhttp_send_reply(req, HTTP_OK, "OK", res_buf);
    evbuffer_free(res_buf);
  }
  evhttp_clear_headers(&args);
}

int
main(int argc, char **argv)
{
  struct evhttp *httpd;

  event_init();

  if (httpd = evhttp_start("0.0.0.0", PORT)) {
    void *zmq_ctx, *zmq_sock;

    if (!(zmq_ctx = zmq_init(1))) {
      fprintf(stderr, "cannot create zmq context.");
    } else {
      if (!(zmq_sock = zmq_socket(zmq_ctx, ZMQ_PUB))) {
        fprintf(stderr, "cannot create zmq_socket.");

      } else if (zmq_connect(zmq_sock, "tcp://127.0.0.1:1234")) {
        fprintf(stderr, "cannot connect zmq_socket.");
      } else {
        evhttp_set_gencb(httpd, generic_handler, zmq_sock);
        event_dispatch();

        evhttp_free(httpd);
      }
      zmq_term(zmq_ctx);
    }
  } else {
    fprintf(stderr, "cannot bind port %d", PORT);
  }
  return 0;
}

client

#include 
#include 
#include 

int
main(int argc, char **argv)
{
  msgpack_zone *mempool;

  if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
    fprintf(stderr, "cannot create msgpack zone.");
  } else {
    void *zmq_ctx, *zmq_sock;
    if (!(zmq_ctx = zmq_init(1))) {
      fprintf(stderr, "cannot create zmq context.");
    } else {
      if (!(zmq_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
        fprintf(stderr, "cannot create zmq_socket.");

      } else if (zmq_bind(zmq_sock, "tcp://*:1234")) {
        fprintf(stderr, "cannot bind zmq_socket.");
      } else {
        zmq_pollitem_t items[] = {
          { zmq_sock, 0, ZMQ_POLLIN, 0}
        };
        zmq_setsockopt(zmq_sock, ZMQ_SUBSCRIBE, "", 0);
        while (1) {
          zmq_poll(items, 1, -1);
          if (items[0].revents & ZMQ_POLLIN) { /* always true */
            zmq_msg_t msg;
            if (zmq_msg_init(&msg)) {
              fprintf(stderr, "cannot init zmq message.");
            } else {
              if (zmq_recv(zmq_sock, &msg, 0)) {
                fprintf(stderr, "cannot recv zmq message.");
              } else {
                msgpack_object obj;
                msgpack_unpack_return ret;
                ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
                if (MSGPACK_UNPACK_SUCCESS == ret) {
                  msgpack_object_print(stdout, obj);
                  printf("\n");
                }
              }
              zmq_msg_close(&msg);
            }
          }
        }
      }
      zmq_term(zmq_ctx);
    }
    msgpack_zone_free(mempool);
  }
  return 0;
}

Makefile

all: suggest-client suggest-server

suggest-server: suggest-server.c
  gcc -O3 -ggdb -o suggest-server suggest-server.c -lzmq -levent -lmsgpack

suggest-client: suggest-client.c
  gcc -O3 -ggdb -o suggest-client suggest-client.c -lzmq -levent -lmsgpack