diff options
author | Martin Lucina <martin@lucina.net> | 2011-11-09 18:51:46 +0100 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2011-11-09 18:51:46 +0100 |
commit | ff7caa01f47df249066169daa314664c3669842e (patch) | |
tree | 0b7750d6c7056e191260afe3c90aef4c5c26a313 /monitor.c |
Initial commit
Diffstat (limited to 'monitor.c')
-rw-r--r-- | monitor.c | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/monitor.c b/monitor.c new file mode 100644 index 0000000..25c7867 --- /dev/null +++ b/monitor.c @@ -0,0 +1,175 @@ +/* monitor.c: Monitoring component. + * + * Copyright (C) 2011 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include <arpa/inet.h> +#include <assert.h> +#include <stdint.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <zmq.h> + +#define ERR_EXIT(what) { \ + fprintf (stderr, "error in %s: %s\n", what, zmq_strerror (errno)); \ + exit (EXIT_FAILURE); \ + } + +int main (int argc, char *argv []) +{ + void *ctx; + const char *client_ep, *service_ep, *notifier_host; + char notifier_ep[1024]; + char msg_buf[1024]; + char hostname[1024]; + void *client_socket, *service_socket, *notifier_socket; + int rc; + + if (argc != 4) { + fprintf (stderr, + "usage: monitor "\ + "<notifier-host> <client-bind-ep> <service-connect-ep>\n"); + exit (EXIT_FAILURE); + } + notifier_host = argv [1]; + sprintf (notifier_ep, "tcp://%s:7770", notifier_host); + client_ep = argv [2]; + service_ep = argv [3]; + + rc = gethostname(hostname, sizeof hostname); + assert (rc == 0); + + ctx = zmq_init (1); + if (!ctx) + ERR_EXIT ("zmq_init"); + + client_socket = zmq_socket (ctx, ZMQ_XPUB); + if (!client_socket) + ERR_EXIT ("zmq_socket"); + + rc = zmq_bind (client_socket, client_ep); + if (rc < 0) + ERR_EXIT ("zmq_bind"); + + service_socket = zmq_socket (ctx, ZMQ_XSUB); + if (!service_socket) + ERR_EXIT ("zmq_socket"); + + rc = zmq_connect (service_socket, service_ep); + if (rc < 0) + ERR_EXIT ("zmq_connect"); + + notifier_socket = zmq_socket (ctx, ZMQ_PUB); + if (!notifier_socket) + ERR_EXIT ("zmq_socket"); + + rc = zmq_connect (notifier_socket, notifier_ep); + if (rc < 0) + ERR_EXIT ("zmq_bind"); + + int64_t more; + size_t moresz; + uint32_t seq_no = 0; + + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + + zmq_pollitem_t items [2]; + items [0].socket = client_socket; + items [0].fd = 0; + items [0].events = ZMQ_POLLIN; + items [0].revents = 0; + items [1].socket = service_socket; + items [1].fd = 0; + items [1].events = ZMQ_POLLIN; + items [1].revents = 0; + + while (1) { + + /* Wait for either upstream subscriptions or messages. */ + rc = zmq_poll (&items [0], 2, -1); + if (rc < 0) + ERR_EXIT ("zmq_poll"); + + /* Process an upstream subscription. */ + if (items [0].revents & ZMQ_POLLIN) { + while (1) { + + rc = zmq_recvmsg (client_socket, &msg, 0); + if (rc < 0) + ERR_EXIT ("zmq_recvmsg"); + + moresz = sizeof (more); + rc = zmq_getsockopt (client_socket, ZMQ_RCVMORE, &more, &moresz); + if (rc < 0) + ERR_EXIT ("zmq_getsockopt"); + + rc = zmq_sendmsg (service_socket, &msg, more ? ZMQ_SNDMORE : 0); + if (rc < 0) + ERR_EXIT ("zmq_sendmsg"); + + if (!more) + break; + } + } + + /* Process a downstream message. */ + if (items [1].revents & ZMQ_POLLIN) { + while (1) { + + rc = zmq_recvmsg (service_socket, &msg, 0); + if (rc < 0) + ERR_EXIT ("zmq_recvmsg"); + + moresz = sizeof (more); + rc = zmq_getsockopt (service_socket, ZMQ_RCVMORE, &more, &moresz); + if (rc < 0) + ERR_EXIT ("zmq_getsockopt"); + + rc = zmq_sendmsg (client_socket, &msg, more ? ZMQ_SNDMORE : 0); + if (rc < 0) + ERR_EXIT ("zmq_sendmsg"); + + if (!more) { + /* For each complete message, publish the sequence number + (# of messages sent) to the notifier socket. */ + uint32_t seq_no_wire; + + seq_no++; + seq_no_wire = htonl (seq_no); + sprintf(msg_buf, "VALUE %s monitor %u", hostname, seq_no); + rc = zmq_send (notifier_socket, msg_buf, strlen(msg_buf), 0); + if (rc != strlen(msg_buf)) + ERR_EXIT ("zmq_send"); + + break; + } + } + } + + } + + exit (EXIT_SUCCESS); +} + |