diff options
-rw-r--r-- | Makefile.am | 4 | ||||
-rw-r--r-- | configure.in | 2 | ||||
-rw-r--r-- | zmqd/Makefile.am | 8 | ||||
-rw-r--r-- | zmqd/zmqd.cpp | 364 |
4 files changed, 3 insertions, 375 deletions
diff --git a/Makefile.am b/Makefile.am index 314a97c..18775fd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,7 +1,7 @@ ACLOCAL_AMFLAGS = -I config -SUBDIRS = src doc perf devices zmqd -DIST_SUBDIRS = src doc perf devices zmqd builds/msvc +SUBDIRS = src doc perf devices +DIST_SUBDIRS = src doc perf devices builds/msvc EXTRA_DIST = \ $(top_srcdir)/foreign/openpgm/@pgm_basename@.tar.gz \ diff --git a/configure.in b/configure.in index b81e2bd..a04e7dd 100644 --- a/configure.in +++ b/configure.in @@ -373,7 +373,7 @@ AC_OUTPUT(Makefile src/Makefile doc/Makefile perf/Makefile src/libzmq.pc \ devices/Makefile devices/zmq_forwarder/Makefile \ devices/zmq_streamer/Makefile devices/zmq_queue/Makefile \ - zmqd/Makefile builds/msvc/Makefile) + builds/msvc/Makefile) # On Linux patch libtool to delete hardcoded paths (rpath). case "${host_os}" in diff --git a/zmqd/Makefile.am b/zmqd/Makefile.am deleted file mode 100644 index 32a657b..0000000 --- a/zmqd/Makefile.am +++ /dev/null @@ -1,8 +0,0 @@ -INCLUDES = -I$(top_srcdir)/include - -bin_PROGRAMS = zmqd - -zmqd_LDADD = $(top_builddir)/src/libzmq.la -zmqd_SOURCES = zmqd.cpp - - diff --git a/zmqd/zmqd.cpp b/zmqd/zmqd.cpp deleted file mode 100644 index 1a48c86..0000000 --- a/zmqd/zmqd.cpp +++ /dev/null @@ -1,364 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <vector> -#include <string> -#include <memory> - -#include "../include/zmq.hpp" -#include "../foreign/xmlParser/xmlParser.cpp" - -class device_cfg_t -{ - - enum endpoint_direction {connect, bind}; - - typedef std::pair <endpoint_direction, std::string> sock_details_t; - - typedef std::vector <sock_details_t> vsock_dets_t; - -public: - - explicit device_cfg_t (int type) : - device_type (type), - context (0), - in_socket (0), - out_socket (0) - { - } - - virtual ~device_cfg_t() - { - delete out_socket; - delete in_socket; - } - - bool init (XMLNode &device) - { - XMLNode in_node = device.getChildNode ("in"); - if (in_node.isEmpty ()) { - fprintf (stderr, - "'in' node is missing in the configuration file\n"); - return false; - } - - XMLNode out_node = device.getChildNode ("out"); - if (out_node.isEmpty ()) { - fprintf (stderr, - "'out' node is missing in the configuration file\n"); - return false; - } - - if (!process_node (in_node, true, device_cfg_t::bind)) - return false; - if (!process_node (in_node, true, device_cfg_t::connect)) - return false; - if (!process_node (out_node, false, device_cfg_t::bind)) - return false; - if (!process_node (out_node, false, device_cfg_t::connect)) - return false; - - return true; - } - - void set_context(zmq::context_t *context_) - { - context = context_; - } - - zmq::context_t *get_context () const - { - return context; - } - - virtual bool make_sockets () = 0; - - bool set_up_connections () - { - for (vsock_dets_t::const_iterator i = in.begin (); i != in.end (); - ++i) { - switch (i->first) - { - case device_cfg_t::connect: - in_socket->connect (i->second.c_str ()); - break; - case device_cfg_t::bind: - in_socket->bind (i->second.c_str ()); - } - } - - for (vsock_dets_t::const_iterator i = out.begin (); i != out.end (); - ++i) { - switch (i->first) - { - case device_cfg_t::connect: - out_socket->connect (i->second.c_str ()); - break; - case device_cfg_t::bind: - out_socket->bind (i->second.c_str ()); - } - - } - return true; - } - - void run() - { - zmq::device(device_type, *in_socket, *out_socket); - } - - -protected: - - bool make_sockets (int in_type, int out_type) - { - in_socket = new (std::nothrow) zmq::socket_t (*context, in_type); - if (!in_socket) - return false; - out_socket = new (std::nothrow) zmq::socket_t (*context, out_type); - if (!out_socket) { - return false; - } - return true; - } - - int process_node (XMLNode& target_, bool in_, - device_cfg_t::endpoint_direction ept_) - { - const char *name = - (ept_ == device_cfg_t::connect) ? "connect" : "bind"; - int n = 0; - while (true) { - XMLNode connect = target_.getChildNode (name, n); - if (connect.isEmpty ()) - break; - const char *addr = connect.getAttribute ("addr"); - if (!addr) { - fprintf (stderr, "'%s' node is missing 'addr' attribute\n", - name); - return 0; - } - - if (in_) - in.push_back (sock_details_t (ept_, addr)); - else - out.push_back (sock_details_t (ept_, addr)); - - n++; - } - - return 1; - } - - -protected: - - int device_type; - zmq::context_t *context; - vsock_dets_t in; - vsock_dets_t out; - zmq::socket_t *in_socket; - zmq::socket_t *out_socket; - -private: - - device_cfg_t (device_cfg_t const&); - void operator = (device_cfg_t const&); -}; - -class queue_device_cfg_t : public device_cfg_t -{ -public: - - queue_device_cfg_t () : - device_cfg_t (ZMQ_QUEUE) - { - } - - virtual bool make_sockets () - { - return device_cfg_t::make_sockets (ZMQ_XREP, ZMQ_XREQ); - } -}; - - -class streamer_device_cfg_t : public device_cfg_t -{ -public: - - streamer_device_cfg_t () : - device_cfg_t (ZMQ_STREAMER) - { - } - - virtual bool make_sockets () - { - return device_cfg_t::make_sockets (ZMQ_UPSTREAM, ZMQ_DOWNSTREAM); - } -}; - -class forwarder_device_cfg_t : public device_cfg_t -{ -public: - - forwarder_device_cfg_t() : - device_cfg_t (ZMQ_FORWARDER) - { - } - - virtual bool make_sockets() - { - if (!device_cfg_t::make_sockets (ZMQ_SUB, ZMQ_PUB) ) { - return false; - } - in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0); - return true; - } -}; - - -device_cfg_t *make_device_config (XMLNode& device) -{ - const char *dev_type = device.getAttribute ("type"); - - if (!dev_type) { - fprintf (stderr, "'device' node is missing 'type' attribute\n"); - return NULL; - } - - if (strcmp (dev_type, "forwarder") == 0) { - return new (std::nothrow) forwarder_device_cfg_t; - } - else if (strcmp (dev_type, "streamer") == 0) { - return new (std::nothrow) streamer_device_cfg_t; - } - else if (strcmp (dev_type, "queue") == 0) { - return new (std::nothrow) queue_device_cfg_t; - } - - fprintf (stderr, "type attribute in the device configuration file " - "should be named 'forwarder', 'streamer' or 'queue'\n"); - - return NULL; -} - - -extern "C" void* worker_function(void *arg) -{ - if (!arg) { - fprintf (stderr, "arg is null, returning \n"); - return 0; - } - - std::auto_ptr <device_cfg_t> cfg ((device_cfg_t*) arg); - - zmq::context_t *ctx = cfg->get_context (); - - if (!ctx) { - fprintf (stderr, "no context, returning \n"); - return 0; - } - - if (!cfg->make_sockets ()) { - fprintf (stderr, "failed to make sockets, returning \n"); - return 0; - } - - - if (!cfg->set_up_connections ()) { - fprintf (stderr, "failed to set up connections, returning \n"); - return 0; - } - - cfg->run(); - - return 0; -} - -int main (int argc, char *argv []) -{ - if (argc != 2) { - fprintf (stderr, "usage: zmqd <config-file>\n"); - return 1; - } - - XMLNode root = XMLNode::parseFile (argv [1]); - - if (root.isEmpty ()) { - fprintf (stderr, "configuration file not found or not an XML file\n"); - return 1; - } - - if (strcmp (root.getName (), "config") != 0) { - fprintf (stderr, "root element in the configuration file should be " - "named 'config'\n"); - return 1; - } - - std::vector <device_cfg_t*> vdev; - - while (true) { - - XMLNode device = root.getChildNode ("device", vdev.size()); - - if (device.isEmpty()) - break; - - device_cfg_t* dev = make_device_config (device); - - if (!dev) { - fprintf(stderr, "failed to create device config\n"); - return 1; - } - - if (!dev->init (device)) { - fprintf(stderr,"error with initialising device configuration\n"); - delete dev; - return 1; - } - - vdev.push_back (dev); - } - - std::vector <device_cfg_t*>::size_type num_devices = vdev.size (); - - if (num_devices == 0) { - fprintf(stderr,"no devices in the config file\n"); - return 1; - } - - zmq::context_t ctx (num_devices, 1); - - for (unsigned int i = 0 ; i < num_devices ; ++i) { - - vdev [i]->set_context (&ctx); - - if (i) { - pthread_t worker; - int rc = pthread_create (&worker, NULL, &worker_function, - (void*) vdev [i]); - assert (rc == 0); - } - } - - worker_function ((void*) vdev [0]); - - return 0; -} - |