From ff7caa01f47df249066169daa314664c3669842e Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Wed, 9 Nov 2011 18:51:46 +0100 Subject: Initial commit --- notifier.c | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 notifier.c (limited to 'notifier.c') diff --git a/notifier.c b/notifier.c new file mode 100644 index 0000000..775cf50 --- /dev/null +++ b/notifier.c @@ -0,0 +1,145 @@ +/* monitor.c: Notifier (console mini-broker) component. + * + * Copyright (C) 2011 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include + +#define ERR_EXIT(what) { \ + fprintf (stderr, "error in %s: %s\n", what, zmq_strerror (errno)); \ + exit (EXIT_FAILURE); \ + } + +int main (int argc, char *argv []) +{ + void *ctx; + const char *app_ep = "tcp://*:7770", + *console_ep = "tcp://*:7771"; + void *app_socket, *console_socket; + int rc; + + if (argc != 1) { + fprintf (stderr, "usage: notifier\n"); + fprintf (stderr, " binds to ports 7770 (application port)\n"); + fprintf (stderr, " and 7771 (console port)\n"); + exit (EXIT_FAILURE); + } + + ctx = zmq_init (1); + if (!ctx) + ERR_EXIT ("zmq_init"); + + app_socket = zmq_socket (ctx, ZMQ_XSUB); + if (!app_socket) + ERR_EXIT ("zmq_socket"); + + rc = zmq_bind (app_socket, app_ep); + if (rc < 0) + ERR_EXIT ("zmq_bind"); + + console_socket = zmq_socket (ctx, ZMQ_XPUB); + if (!console_socket) + ERR_EXIT ("zmq_socket"); + + rc = zmq_bind (console_socket, console_ep); + if (rc < 0) + ERR_EXIT ("zmq_bind"); + + int64_t more; + size_t moresz; + + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + + zmq_pollitem_t items [2]; + items [0].socket = app_socket; + items [0].fd = 0; + items [0].events = ZMQ_POLLIN; + items [0].revents = 0; + items [1].socket = console_socket; + items [1].fd = 0; + items [1].events = ZMQ_POLLIN; + items [1].revents = 0; + + while (1) { + + /* Wait for either upstream subscriptions or messages. */ + rc = zmq_poll (&items [0], 2, -1); + if (rc < 0) + ERR_EXIT ("zmq_poll"); + + /* Process an upstream subscription. */ + if (items [0].revents & ZMQ_POLLIN) { + while (1) { + + rc = zmq_recvmsg (app_socket, &msg, 0); + if (rc < 0) + ERR_EXIT ("zmq_recvmsg"); + + moresz = sizeof (more); + rc = zmq_getsockopt (app_socket, ZMQ_RCVMORE, &more, &moresz); + if (rc < 0) + ERR_EXIT ("zmq_getsockopt"); + + rc = zmq_sendmsg (console_socket, &msg, more ? ZMQ_SNDMORE : 0); + if (rc < 0) + ERR_EXIT ("zmq_sendmsg"); + + if (!more) + break; + } + } + + /* Process a downstream message. */ + if (items [1].revents & ZMQ_POLLIN) { + while (1) { + + rc = zmq_recvmsg (console_socket, &msg, 0); + if (rc < 0) + ERR_EXIT ("zmq_recvmsg"); + + moresz = sizeof (more); + rc = zmq_getsockopt (console_socket, ZMQ_RCVMORE, &more, &moresz); + if (rc < 0) + ERR_EXIT ("zmq_getsockopt"); + + rc = zmq_sendmsg (app_socket, &msg, more ? ZMQ_SNDMORE : 0); + if (rc < 0) + ERR_EXIT ("zmq_sendmsg"); + + if (!more) { + break; + } + } + } + + } + + exit (EXIT_SUCCESS); +} + -- cgit v1.2.3