diff options
Diffstat (limited to 'zmqd')
-rw-r--r-- | zmqd/Makefile.am | 8 | ||||
-rw-r--r-- | zmqd/zmqd.cpp | 368 |
2 files changed, 376 insertions, 0 deletions
diff --git a/zmqd/Makefile.am b/zmqd/Makefile.am new file mode 100644 index 0000000..32a657b --- /dev/null +++ b/zmqd/Makefile.am @@ -0,0 +1,8 @@ +INCLUDES = -I$(top_srcdir)/include + +bin_PROGRAMS = zmqd + +zmqd_LDADD = $(top_builddir)/src/libzmq.la +zmqd_SOURCES = zmqd.cpp + + diff --git a/zmqd/zmqd.cpp b/zmqd/zmqd.cpp new file mode 100644 index 0000000..5b27ac2 --- /dev/null +++ b/zmqd/zmqd.cpp @@ -0,0 +1,368 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <vector> +#include <string> +#include <memory> + +#include "../include/zmq.hpp" +#include "../foreign/xmlParser/xmlParser.cpp" + +namespace { + + class device_cfg_t + { + + enum endpoint_direction {connect, bind}; + + typedef std::pair<endpoint_direction, std::string> sock_details_t; + + typedef std::vector<sock_details_t> vsock_dets_t; + + public: + + explicit device_cfg_t(int type) + : device_type(type) , context(0) + , in_socket(0), out_socket(0) + { + } + + virtual ~device_cfg_t() + { + delete out_socket; + delete in_socket; + } + + bool init(XMLNode& device) + { + + XMLNode in_node = device.getChildNode ("in"); + if (in_node.isEmpty ()) { + fprintf (stderr, "'in' node is missing in the configuration file\n"); + return false; + } + + XMLNode out_node = device.getChildNode ("out"); + if (out_node.isEmpty ()) { + fprintf (stderr, "'out' node is missing in the configuration file\n"); + return false; + } + + if (!process_node(in_node,true,device_cfg_t::bind)) + return false; + if (!process_node(in_node,true,device_cfg_t::connect)) + return false; + if (!process_node(out_node,false,device_cfg_t::bind)) + return false; + if (!process_node(out_node,false,device_cfg_t::connect)) + return false; + + return true; + } + + void set_context(zmq::context_t* context_) + { + context = context_; + } + + zmq::context_t *get_context() const + { + return context; + } + + virtual bool make_sockets() = 0; + + bool set_up_connections() + { + for (vsock_dets_t::const_iterator i = in.begin() ; i != in.end(); + ++i) { + + switch (i->first) + { + case device_cfg_t::connect : + in_socket->connect(i->second.c_str()); + break; + case device_cfg_t::bind : + in_socket->bind(i->second.c_str()); + } + + } + + for (vsock_dets_t::const_iterator i = out.begin() ; i != out.end(); + ++i) { + + switch (i->first) + { + case device_cfg_t::connect : + out_socket->connect(i->second.c_str()); + break; + case device_cfg_t::bind : + out_socket->bind(i->second.c_str()); + } + + } + return true; + } + + void run() + { + zmq::device(device_type, *in_socket, *out_socket); + } + + + protected: + + bool make_sockets(int in_type, int out_type) + { + in_socket = new (std::nothrow) zmq::socket_t(*context, in_type); + if (!in_socket) + return false; + out_socket = new (std::nothrow) zmq::socket_t(*context, out_type); + if (!out_socket) { + return false; + } + return true; + } + + int process_node(XMLNode& target_, bool in_, + device_cfg_t::endpoint_direction ept_) + { + + const char * name = + (ept_ == device_cfg_t::connect) ? "connect" : "bind"; + int n = 0; + while (true) { + XMLNode connect = target_.getChildNode (name, n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'%s' node is missing 'addr' attribute\n", + name); + return 0; + } + + if (in_) + in.push_back( sock_details_t(ept_, addr)); + else + out.push_back( sock_details_t(ept_, addr)); + + n++; + } + + return 1; + } + + + protected: + + int device_type; + zmq::context_t* context; + vsock_dets_t in; + vsock_dets_t out; + zmq::socket_t* in_socket; + zmq::socket_t* out_socket; + + private: + void operator = (device_cfg_t const &); + device_cfg_t(device_cfg_t const &); + }; + + + + class queue_device_cfg_t : public device_cfg_t + { + public: + queue_device_cfg_t() + : device_cfg_t(ZMQ_QUEUE) + {} + virtual bool make_sockets(){ + return device_cfg_t::make_sockets(ZMQ_XREP, ZMQ_XREQ); + } + }; + + + class streamer_device_cfg_t : public device_cfg_t + { + public: + streamer_device_cfg_t() + : device_cfg_t(ZMQ_STREAMER) + {} + virtual bool make_sockets () { + return device_cfg_t::make_sockets(ZMQ_UPSTREAM, ZMQ_DOWNSTREAM); + } + }; + + class forwarder_device_cfg_t : public device_cfg_t + { + public: + forwarder_device_cfg_t() + : device_cfg_t(ZMQ_FORWARDER) + {} + virtual bool make_sockets() { + if (!device_cfg_t::make_sockets(ZMQ_SUB, ZMQ_PUB) ) { + return false; + } + in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0); + return true; + } + }; + + + device_cfg_t* make_device_config(XMLNode& device) + { + const char *dev_type = device.getAttribute ("type"); + + if (!dev_type) { + fprintf (stderr, "'device' node is missing 'type' attribute\n"); + return NULL; + } + + if (strcmp (dev_type, "forwarder") == 0) { + return new (std::nothrow) forwarder_device_cfg_t; + } + else if (strcmp (dev_type, "streamer") == 0) { + return new (std::nothrow) streamer_device_cfg_t; + } + else if (strcmp (dev_type, "queue") == 0) { + return new (std::nothrow) queue_device_cfg_t; + } + + fprintf (stderr, "type attribute in the device configuration file " + "should be named 'forwarder', 'streamer' or 'queue'\n"); + + return NULL; + } + + + extern "C" void* worker_function(void *arg) + { + + if (!arg) { + fprintf (stderr, "arg is null, returning \n"); + return 0; + } + + std::auto_ptr<device_cfg_t> cfg ( (device_cfg_t*) arg ); + + zmq::context_t* ctx = cfg->get_context(); + + if (!ctx) { + fprintf (stderr, "no context, returning \n"); + return 0; + } + + if (! cfg->make_sockets()) { + fprintf (stderr, "failed to make sockets, returning \n"); + return 0; + } + + + if (! cfg->set_up_connections()) { + fprintf (stderr, "failed to set up connections, returning \n"); + return 0; + } + + cfg->run(); + + return 0; + + } + + +} + + +int main (int argc, char *argv []) +{ + if (argc != 2) { + fprintf (stderr, "usage: zmqd <config-file>\n"); + return 1; + } + + XMLNode root = XMLNode::parseFile (argv [1]); + + if (root.isEmpty ()) { + fprintf (stderr, "configuration file not found or not an XML file\n"); + return 1; + } + + if (strcmp (root.getName (), "config") != 0) { + fprintf (stderr, "root element in the configuration file should be " + "named 'config'\n"); + return 1; + } + + + std::vector<device_cfg_t*> vdev; + + while (true) { + + XMLNode device = root.getChildNode ("device", vdev.size()); + + if (device.isEmpty()) + break; + + device_cfg_t* dev = make_device_config(device); + + if (!dev) { + fprintf(stderr, "failed to create device config\n"); + return 1; + } + + if (! dev->init(device) ) { + + fprintf(stderr,"error with initialising device configuration\n"); + delete dev; + return 1; + } + + vdev.push_back(dev); + } + + std::vector<device_cfg_t*>::size_type num_devices = vdev.size(); + + if ( num_devices == 0 ) { + fprintf(stderr,"no devices in the config file\n"); + return 1; + } + + + zmq::context_t ctx (num_devices,1); + + + for (unsigned int i = 0 ; i < num_devices ; ++i) { + + vdev[i]->set_context(&ctx); + + if (i) { + pthread_t worker; + int rc = pthread_create (&worker, NULL, &worker_function, + (void*) vdev[i]); + assert (rc == 0); + } + } + + + worker_function((void*)vdev[0]); + + + return 0; +} + |