diff options
Diffstat (limited to 'zmqd')
-rw-r--r-- | zmqd/zmqd.cpp | 426 |
1 files changed, 211 insertions, 215 deletions
diff --git a/zmqd/zmqd.cpp b/zmqd/zmqd.cpp index 5b27ac2..1a48c86 100644 --- a/zmqd/zmqd.cpp +++ b/zmqd/zmqd.cpp @@ -24,271 +24,273 @@ #include "../include/zmq.hpp" #include "../foreign/xmlParser/xmlParser.cpp" -namespace { +class device_cfg_t +{ - class device_cfg_t - { + enum endpoint_direction {connect, bind}; - enum endpoint_direction {connect, bind}; + typedef std::pair <endpoint_direction, std::string> sock_details_t; - typedef std::pair<endpoint_direction, std::string> sock_details_t; + typedef std::vector <sock_details_t> vsock_dets_t; - typedef std::vector<sock_details_t> vsock_dets_t; +public: - public: + explicit device_cfg_t (int type) : + device_type (type), + context (0), + in_socket (0), + out_socket (0) + { + } - explicit device_cfg_t(int type) - : device_type(type) , context(0) - , in_socket(0), out_socket(0) - { - } + virtual ~device_cfg_t() + { + delete out_socket; + delete in_socket; + } - virtual ~device_cfg_t() - { - delete out_socket; - delete in_socket; + bool init (XMLNode &device) + { + XMLNode in_node = device.getChildNode ("in"); + if (in_node.isEmpty ()) { + fprintf (stderr, + "'in' node is missing in the configuration file\n"); + return false; } - bool init(XMLNode& device) - { + XMLNode out_node = device.getChildNode ("out"); + if (out_node.isEmpty ()) { + fprintf (stderr, + "'out' node is missing in the configuration file\n"); + return false; + } - XMLNode in_node = device.getChildNode ("in"); - if (in_node.isEmpty ()) { - fprintf (stderr, "'in' node is missing in the configuration file\n"); - return false; - } + if (!process_node (in_node, true, device_cfg_t::bind)) + return false; + if (!process_node (in_node, true, device_cfg_t::connect)) + return false; + if (!process_node (out_node, false, device_cfg_t::bind)) + return false; + if (!process_node (out_node, false, device_cfg_t::connect)) + return false; - XMLNode out_node = device.getChildNode ("out"); - if (out_node.isEmpty ()) { - fprintf (stderr, "'out' node is missing in the configuration file\n"); - return false; - } + return true; + } - if (!process_node(in_node,true,device_cfg_t::bind)) - return false; - if (!process_node(in_node,true,device_cfg_t::connect)) - return false; - if (!process_node(out_node,false,device_cfg_t::bind)) - return false; - if (!process_node(out_node,false,device_cfg_t::connect)) - return false; + void set_context(zmq::context_t *context_) + { + context = context_; + } - return true; - } + zmq::context_t *get_context () const + { + return context; + } - void set_context(zmq::context_t* context_) - { - context = context_; - } + virtual bool make_sockets () = 0; - zmq::context_t *get_context() const - { - return context; + bool set_up_connections () + { + for (vsock_dets_t::const_iterator i = in.begin (); i != in.end (); + ++i) { + switch (i->first) + { + case device_cfg_t::connect: + in_socket->connect (i->second.c_str ()); + break; + case device_cfg_t::bind: + in_socket->bind (i->second.c_str ()); + } } - virtual bool make_sockets() = 0; - - bool set_up_connections() - { - for (vsock_dets_t::const_iterator i = in.begin() ; i != in.end(); - ++i) { - - switch (i->first) - { - case device_cfg_t::connect : - in_socket->connect(i->second.c_str()); - break; - case device_cfg_t::bind : - in_socket->bind(i->second.c_str()); - } - + for (vsock_dets_t::const_iterator i = out.begin (); i != out.end (); + ++i) { + switch (i->first) + { + case device_cfg_t::connect: + out_socket->connect (i->second.c_str ()); + break; + case device_cfg_t::bind: + out_socket->bind (i->second.c_str ()); } - for (vsock_dets_t::const_iterator i = out.begin() ; i != out.end(); - ++i) { - - switch (i->first) - { - case device_cfg_t::connect : - out_socket->connect(i->second.c_str()); - break; - case device_cfg_t::bind : - out_socket->bind(i->second.c_str()); - } - - } - return true; } + return true; + } - void run() - { - zmq::device(device_type, *in_socket, *out_socket); - } + void run() + { + zmq::device(device_type, *in_socket, *out_socket); + } - protected: +protected: - bool make_sockets(int in_type, int out_type) - { - in_socket = new (std::nothrow) zmq::socket_t(*context, in_type); - if (!in_socket) - return false; - out_socket = new (std::nothrow) zmq::socket_t(*context, out_type); - if (!out_socket) { - return false; - } - return true; + bool make_sockets (int in_type, int out_type) + { + in_socket = new (std::nothrow) zmq::socket_t (*context, in_type); + if (!in_socket) + return false; + out_socket = new (std::nothrow) zmq::socket_t (*context, out_type); + if (!out_socket) { + return false; } + return true; + } - int process_node(XMLNode& target_, bool in_, - device_cfg_t::endpoint_direction ept_) - { - - const char * name = - (ept_ == device_cfg_t::connect) ? "connect" : "bind"; - int n = 0; - while (true) { - XMLNode connect = target_.getChildNode (name, n); - if (connect.isEmpty ()) - break; - const char *addr = connect.getAttribute ("addr"); - if (!addr) { - fprintf (stderr, "'%s' node is missing 'addr' attribute\n", - name); - return 0; - } - - if (in_) - in.push_back( sock_details_t(ept_, addr)); - else - out.push_back( sock_details_t(ept_, addr)); - - n++; + int process_node (XMLNode& target_, bool in_, + device_cfg_t::endpoint_direction ept_) + { + const char *name = + (ept_ == device_cfg_t::connect) ? "connect" : "bind"; + int n = 0; + while (true) { + XMLNode connect = target_.getChildNode (name, n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'%s' node is missing 'addr' attribute\n", + name); + return 0; } - return 1; + if (in_) + in.push_back (sock_details_t (ept_, addr)); + else + out.push_back (sock_details_t (ept_, addr)); + + n++; } + return 1; + } + - protected: +protected: - int device_type; - zmq::context_t* context; - vsock_dets_t in; - vsock_dets_t out; - zmq::socket_t* in_socket; - zmq::socket_t* out_socket; + int device_type; + zmq::context_t *context; + vsock_dets_t in; + vsock_dets_t out; + zmq::socket_t *in_socket; + zmq::socket_t *out_socket; - private: - void operator = (device_cfg_t const &); - device_cfg_t(device_cfg_t const &); - }; +private: + device_cfg_t (device_cfg_t const&); + void operator = (device_cfg_t const&); +}; +class queue_device_cfg_t : public device_cfg_t +{ +public: - class queue_device_cfg_t : public device_cfg_t + queue_device_cfg_t () : + device_cfg_t (ZMQ_QUEUE) { - public: - queue_device_cfg_t() - : device_cfg_t(ZMQ_QUEUE) - {} - virtual bool make_sockets(){ - return device_cfg_t::make_sockets(ZMQ_XREP, ZMQ_XREQ); - } - }; + } + + virtual bool make_sockets () + { + return device_cfg_t::make_sockets (ZMQ_XREP, ZMQ_XREQ); + } +}; - class streamer_device_cfg_t : public device_cfg_t +class streamer_device_cfg_t : public device_cfg_t +{ +public: + + streamer_device_cfg_t () : + device_cfg_t (ZMQ_STREAMER) { - public: - streamer_device_cfg_t() - : device_cfg_t(ZMQ_STREAMER) - {} - virtual bool make_sockets () { - return device_cfg_t::make_sockets(ZMQ_UPSTREAM, ZMQ_DOWNSTREAM); - } - }; + } - class forwarder_device_cfg_t : public device_cfg_t + virtual bool make_sockets () { - public: - forwarder_device_cfg_t() - : device_cfg_t(ZMQ_FORWARDER) - {} - virtual bool make_sockets() { - if (!device_cfg_t::make_sockets(ZMQ_SUB, ZMQ_PUB) ) { - return false; - } - in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0); - return true; - } - }; + return device_cfg_t::make_sockets (ZMQ_UPSTREAM, ZMQ_DOWNSTREAM); + } +}; +class forwarder_device_cfg_t : public device_cfg_t +{ +public: - device_cfg_t* make_device_config(XMLNode& device) + forwarder_device_cfg_t() : + device_cfg_t (ZMQ_FORWARDER) { - const char *dev_type = device.getAttribute ("type"); - - if (!dev_type) { - fprintf (stderr, "'device' node is missing 'type' attribute\n"); - return NULL; - } + } - if (strcmp (dev_type, "forwarder") == 0) { - return new (std::nothrow) forwarder_device_cfg_t; - } - else if (strcmp (dev_type, "streamer") == 0) { - return new (std::nothrow) streamer_device_cfg_t; - } - else if (strcmp (dev_type, "queue") == 0) { - return new (std::nothrow) queue_device_cfg_t; + virtual bool make_sockets() + { + if (!device_cfg_t::make_sockets (ZMQ_SUB, ZMQ_PUB) ) { + return false; } - - fprintf (stderr, "type attribute in the device configuration file " - "should be named 'forwarder', 'streamer' or 'queue'\n"); - - return NULL; + in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0); + return true; } +}; - extern "C" void* worker_function(void *arg) - { +device_cfg_t *make_device_config (XMLNode& device) +{ + const char *dev_type = device.getAttribute ("type"); - if (!arg) { - fprintf (stderr, "arg is null, returning \n"); - return 0; - } + if (!dev_type) { + fprintf (stderr, "'device' node is missing 'type' attribute\n"); + return NULL; + } - std::auto_ptr<device_cfg_t> cfg ( (device_cfg_t*) arg ); + if (strcmp (dev_type, "forwarder") == 0) { + return new (std::nothrow) forwarder_device_cfg_t; + } + else if (strcmp (dev_type, "streamer") == 0) { + return new (std::nothrow) streamer_device_cfg_t; + } + else if (strcmp (dev_type, "queue") == 0) { + return new (std::nothrow) queue_device_cfg_t; + } + + fprintf (stderr, "type attribute in the device configuration file " + "should be named 'forwarder', 'streamer' or 'queue'\n"); - zmq::context_t* ctx = cfg->get_context(); + return NULL; +} - if (!ctx) { - fprintf (stderr, "no context, returning \n"); - return 0; - } - if (! cfg->make_sockets()) { - fprintf (stderr, "failed to make sockets, returning \n"); - return 0; - } +extern "C" void* worker_function(void *arg) +{ + if (!arg) { + fprintf (stderr, "arg is null, returning \n"); + return 0; + } + std::auto_ptr <device_cfg_t> cfg ((device_cfg_t*) arg); - if (! cfg->set_up_connections()) { - fprintf (stderr, "failed to set up connections, returning \n"); - return 0; - } + zmq::context_t *ctx = cfg->get_context (); - cfg->run(); + if (!ctx) { + fprintf (stderr, "no context, returning \n"); + return 0; + } + if (!cfg->make_sockets ()) { + fprintf (stderr, "failed to make sockets, returning \n"); return 0; + } + + if (!cfg->set_up_connections ()) { + fprintf (stderr, "failed to set up connections, returning \n"); + return 0; } + cfg->run(); + return 0; } - int main (int argc, char *argv []) { if (argc != 2) { @@ -309,8 +311,7 @@ int main (int argc, char *argv []) return 1; } - - std::vector<device_cfg_t*> vdev; + std::vector <device_cfg_t*> vdev; while (true) { @@ -319,49 +320,44 @@ int main (int argc, char *argv []) if (device.isEmpty()) break; - device_cfg_t* dev = make_device_config(device); + device_cfg_t* dev = make_device_config (device); if (!dev) { fprintf(stderr, "failed to create device config\n"); return 1; } - if (! dev->init(device) ) { - + if (!dev->init (device)) { fprintf(stderr,"error with initialising device configuration\n"); delete dev; return 1; } - vdev.push_back(dev); + vdev.push_back (dev); } - std::vector<device_cfg_t*>::size_type num_devices = vdev.size(); + std::vector <device_cfg_t*>::size_type num_devices = vdev.size (); - if ( num_devices == 0 ) { + if (num_devices == 0) { fprintf(stderr,"no devices in the config file\n"); return 1; } - - zmq::context_t ctx (num_devices,1); - + zmq::context_t ctx (num_devices, 1); for (unsigned int i = 0 ; i < num_devices ; ++i) { - vdev[i]->set_context(&ctx); + vdev [i]->set_context (&ctx); if (i) { pthread_t worker; int rc = pthread_create (&worker, NULL, &worker_function, - (void*) vdev[i]); + (void*) vdev [i]); assert (rc == 0); } } - - worker_function((void*)vdev[0]); - + worker_function ((void*) vdev [0]); return 0; } |