summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile.am4
-rw-r--r--configure.in2
-rw-r--r--zmqd/Makefile.am8
-rw-r--r--zmqd/zmqd.cpp364
4 files changed, 3 insertions, 375 deletions
diff --git a/Makefile.am b/Makefile.am
index 314a97c..18775fd 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1,7 +1,7 @@
ACLOCAL_AMFLAGS = -I config
-SUBDIRS = src doc perf devices zmqd
-DIST_SUBDIRS = src doc perf devices zmqd builds/msvc
+SUBDIRS = src doc perf devices
+DIST_SUBDIRS = src doc perf devices builds/msvc
EXTRA_DIST = \
$(top_srcdir)/foreign/openpgm/@pgm_basename@.tar.gz \
diff --git a/configure.in b/configure.in
index b81e2bd..a04e7dd 100644
--- a/configure.in
+++ b/configure.in
@@ -373,7 +373,7 @@ AC_OUTPUT(Makefile src/Makefile doc/Makefile
perf/Makefile src/libzmq.pc \
devices/Makefile devices/zmq_forwarder/Makefile \
devices/zmq_streamer/Makefile devices/zmq_queue/Makefile \
- zmqd/Makefile builds/msvc/Makefile)
+ builds/msvc/Makefile)
# On Linux patch libtool to delete hardcoded paths (rpath).
case "${host_os}" in
diff --git a/zmqd/Makefile.am b/zmqd/Makefile.am
deleted file mode 100644
index 32a657b..0000000
--- a/zmqd/Makefile.am
+++ /dev/null
@@ -1,8 +0,0 @@
-INCLUDES = -I$(top_srcdir)/include
-
-bin_PROGRAMS = zmqd
-
-zmqd_LDADD = $(top_builddir)/src/libzmq.la
-zmqd_SOURCES = zmqd.cpp
-
-
diff --git a/zmqd/zmqd.cpp b/zmqd/zmqd.cpp
deleted file mode 100644
index 1a48c86..0000000
--- a/zmqd/zmqd.cpp
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <vector>
-#include <string>
-#include <memory>
-
-#include "../include/zmq.hpp"
-#include "../foreign/xmlParser/xmlParser.cpp"
-
-class device_cfg_t
-{
-
- enum endpoint_direction {connect, bind};
-
- typedef std::pair <endpoint_direction, std::string> sock_details_t;
-
- typedef std::vector <sock_details_t> vsock_dets_t;
-
-public:
-
- explicit device_cfg_t (int type) :
- device_type (type),
- context (0),
- in_socket (0),
- out_socket (0)
- {
- }
-
- virtual ~device_cfg_t()
- {
- delete out_socket;
- delete in_socket;
- }
-
- bool init (XMLNode &device)
- {
- XMLNode in_node = device.getChildNode ("in");
- if (in_node.isEmpty ()) {
- fprintf (stderr,
- "'in' node is missing in the configuration file\n");
- return false;
- }
-
- XMLNode out_node = device.getChildNode ("out");
- if (out_node.isEmpty ()) {
- fprintf (stderr,
- "'out' node is missing in the configuration file\n");
- return false;
- }
-
- if (!process_node (in_node, true, device_cfg_t::bind))
- return false;
- if (!process_node (in_node, true, device_cfg_t::connect))
- return false;
- if (!process_node (out_node, false, device_cfg_t::bind))
- return false;
- if (!process_node (out_node, false, device_cfg_t::connect))
- return false;
-
- return true;
- }
-
- void set_context(zmq::context_t *context_)
- {
- context = context_;
- }
-
- zmq::context_t *get_context () const
- {
- return context;
- }
-
- virtual bool make_sockets () = 0;
-
- bool set_up_connections ()
- {
- for (vsock_dets_t::const_iterator i = in.begin (); i != in.end ();
- ++i) {
- switch (i->first)
- {
- case device_cfg_t::connect:
- in_socket->connect (i->second.c_str ());
- break;
- case device_cfg_t::bind:
- in_socket->bind (i->second.c_str ());
- }
- }
-
- for (vsock_dets_t::const_iterator i = out.begin (); i != out.end ();
- ++i) {
- switch (i->first)
- {
- case device_cfg_t::connect:
- out_socket->connect (i->second.c_str ());
- break;
- case device_cfg_t::bind:
- out_socket->bind (i->second.c_str ());
- }
-
- }
- return true;
- }
-
- void run()
- {
- zmq::device(device_type, *in_socket, *out_socket);
- }
-
-
-protected:
-
- bool make_sockets (int in_type, int out_type)
- {
- in_socket = new (std::nothrow) zmq::socket_t (*context, in_type);
- if (!in_socket)
- return false;
- out_socket = new (std::nothrow) zmq::socket_t (*context, out_type);
- if (!out_socket) {
- return false;
- }
- return true;
- }
-
- int process_node (XMLNode& target_, bool in_,
- device_cfg_t::endpoint_direction ept_)
- {
- const char *name =
- (ept_ == device_cfg_t::connect) ? "connect" : "bind";
- int n = 0;
- while (true) {
- XMLNode connect = target_.getChildNode (name, n);
- if (connect.isEmpty ())
- break;
- const char *addr = connect.getAttribute ("addr");
- if (!addr) {
- fprintf (stderr, "'%s' node is missing 'addr' attribute\n",
- name);
- return 0;
- }
-
- if (in_)
- in.push_back (sock_details_t (ept_, addr));
- else
- out.push_back (sock_details_t (ept_, addr));
-
- n++;
- }
-
- return 1;
- }
-
-
-protected:
-
- int device_type;
- zmq::context_t *context;
- vsock_dets_t in;
- vsock_dets_t out;
- zmq::socket_t *in_socket;
- zmq::socket_t *out_socket;
-
-private:
-
- device_cfg_t (device_cfg_t const&);
- void operator = (device_cfg_t const&);
-};
-
-class queue_device_cfg_t : public device_cfg_t
-{
-public:
-
- queue_device_cfg_t () :
- device_cfg_t (ZMQ_QUEUE)
- {
- }
-
- virtual bool make_sockets ()
- {
- return device_cfg_t::make_sockets (ZMQ_XREP, ZMQ_XREQ);
- }
-};
-
-
-class streamer_device_cfg_t : public device_cfg_t
-{
-public:
-
- streamer_device_cfg_t () :
- device_cfg_t (ZMQ_STREAMER)
- {
- }
-
- virtual bool make_sockets ()
- {
- return device_cfg_t::make_sockets (ZMQ_UPSTREAM, ZMQ_DOWNSTREAM);
- }
-};
-
-class forwarder_device_cfg_t : public device_cfg_t
-{
-public:
-
- forwarder_device_cfg_t() :
- device_cfg_t (ZMQ_FORWARDER)
- {
- }
-
- virtual bool make_sockets()
- {
- if (!device_cfg_t::make_sockets (ZMQ_SUB, ZMQ_PUB) ) {
- return false;
- }
- in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0);
- return true;
- }
-};
-
-
-device_cfg_t *make_device_config (XMLNode& device)
-{
- const char *dev_type = device.getAttribute ("type");
-
- if (!dev_type) {
- fprintf (stderr, "'device' node is missing 'type' attribute\n");
- return NULL;
- }
-
- if (strcmp (dev_type, "forwarder") == 0) {
- return new (std::nothrow) forwarder_device_cfg_t;
- }
- else if (strcmp (dev_type, "streamer") == 0) {
- return new (std::nothrow) streamer_device_cfg_t;
- }
- else if (strcmp (dev_type, "queue") == 0) {
- return new (std::nothrow) queue_device_cfg_t;
- }
-
- fprintf (stderr, "type attribute in the device configuration file "
- "should be named 'forwarder', 'streamer' or 'queue'\n");
-
- return NULL;
-}
-
-
-extern "C" void* worker_function(void *arg)
-{
- if (!arg) {
- fprintf (stderr, "arg is null, returning \n");
- return 0;
- }
-
- std::auto_ptr <device_cfg_t> cfg ((device_cfg_t*) arg);
-
- zmq::context_t *ctx = cfg->get_context ();
-
- if (!ctx) {
- fprintf (stderr, "no context, returning \n");
- return 0;
- }
-
- if (!cfg->make_sockets ()) {
- fprintf (stderr, "failed to make sockets, returning \n");
- return 0;
- }
-
-
- if (!cfg->set_up_connections ()) {
- fprintf (stderr, "failed to set up connections, returning \n");
- return 0;
- }
-
- cfg->run();
-
- return 0;
-}
-
-int main (int argc, char *argv [])
-{
- if (argc != 2) {
- fprintf (stderr, "usage: zmqd <config-file>\n");
- return 1;
- }
-
- XMLNode root = XMLNode::parseFile (argv [1]);
-
- if (root.isEmpty ()) {
- fprintf (stderr, "configuration file not found or not an XML file\n");
- return 1;
- }
-
- if (strcmp (root.getName (), "config") != 0) {
- fprintf (stderr, "root element in the configuration file should be "
- "named 'config'\n");
- return 1;
- }
-
- std::vector <device_cfg_t*> vdev;
-
- while (true) {
-
- XMLNode device = root.getChildNode ("device", vdev.size());
-
- if (device.isEmpty())
- break;
-
- device_cfg_t* dev = make_device_config (device);
-
- if (!dev) {
- fprintf(stderr, "failed to create device config\n");
- return 1;
- }
-
- if (!dev->init (device)) {
- fprintf(stderr,"error with initialising device configuration\n");
- delete dev;
- return 1;
- }
-
- vdev.push_back (dev);
- }
-
- std::vector <device_cfg_t*>::size_type num_devices = vdev.size ();
-
- if (num_devices == 0) {
- fprintf(stderr,"no devices in the config file\n");
- return 1;
- }
-
- zmq::context_t ctx (num_devices, 1);
-
- for (unsigned int i = 0 ; i < num_devices ; ++i) {
-
- vdev [i]->set_context (&ctx);
-
- if (i) {
- pthread_t worker;
- int rc = pthread_create (&worker, NULL, &worker_function,
- (void*) vdev [i]);
- assert (rc == 0);
- }
- }
-
- worker_function ((void*) vdev [0]);
-
- return 0;
-}
-