summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--zmqd/zmqd.cpp426
1 files changed, 211 insertions, 215 deletions
diff --git a/zmqd/zmqd.cpp b/zmqd/zmqd.cpp
index 5b27ac2..1a48c86 100644
--- a/zmqd/zmqd.cpp
+++ b/zmqd/zmqd.cpp
@@ -24,271 +24,273 @@
#include "../include/zmq.hpp"
#include "../foreign/xmlParser/xmlParser.cpp"
-namespace {
+class device_cfg_t
+{
- class device_cfg_t
- {
+ enum endpoint_direction {connect, bind};
- enum endpoint_direction {connect, bind};
+ typedef std::pair <endpoint_direction, std::string> sock_details_t;
- typedef std::pair<endpoint_direction, std::string> sock_details_t;
+ typedef std::vector <sock_details_t> vsock_dets_t;
- typedef std::vector<sock_details_t> vsock_dets_t;
+public:
- public:
+ explicit device_cfg_t (int type) :
+ device_type (type),
+ context (0),
+ in_socket (0),
+ out_socket (0)
+ {
+ }
- explicit device_cfg_t(int type)
- : device_type(type) , context(0)
- , in_socket(0), out_socket(0)
- {
- }
+ virtual ~device_cfg_t()
+ {
+ delete out_socket;
+ delete in_socket;
+ }
- virtual ~device_cfg_t()
- {
- delete out_socket;
- delete in_socket;
+ bool init (XMLNode &device)
+ {
+ XMLNode in_node = device.getChildNode ("in");
+ if (in_node.isEmpty ()) {
+ fprintf (stderr,
+ "'in' node is missing in the configuration file\n");
+ return false;
}
- bool init(XMLNode& device)
- {
+ XMLNode out_node = device.getChildNode ("out");
+ if (out_node.isEmpty ()) {
+ fprintf (stderr,
+ "'out' node is missing in the configuration file\n");
+ return false;
+ }
- XMLNode in_node = device.getChildNode ("in");
- if (in_node.isEmpty ()) {
- fprintf (stderr, "'in' node is missing in the configuration file\n");
- return false;
- }
+ if (!process_node (in_node, true, device_cfg_t::bind))
+ return false;
+ if (!process_node (in_node, true, device_cfg_t::connect))
+ return false;
+ if (!process_node (out_node, false, device_cfg_t::bind))
+ return false;
+ if (!process_node (out_node, false, device_cfg_t::connect))
+ return false;
- XMLNode out_node = device.getChildNode ("out");
- if (out_node.isEmpty ()) {
- fprintf (stderr, "'out' node is missing in the configuration file\n");
- return false;
- }
+ return true;
+ }
- if (!process_node(in_node,true,device_cfg_t::bind))
- return false;
- if (!process_node(in_node,true,device_cfg_t::connect))
- return false;
- if (!process_node(out_node,false,device_cfg_t::bind))
- return false;
- if (!process_node(out_node,false,device_cfg_t::connect))
- return false;
+ void set_context(zmq::context_t *context_)
+ {
+ context = context_;
+ }
- return true;
- }
+ zmq::context_t *get_context () const
+ {
+ return context;
+ }
- void set_context(zmq::context_t* context_)
- {
- context = context_;
- }
+ virtual bool make_sockets () = 0;
- zmq::context_t *get_context() const
- {
- return context;
+ bool set_up_connections ()
+ {
+ for (vsock_dets_t::const_iterator i = in.begin (); i != in.end ();
+ ++i) {
+ switch (i->first)
+ {
+ case device_cfg_t::connect:
+ in_socket->connect (i->second.c_str ());
+ break;
+ case device_cfg_t::bind:
+ in_socket->bind (i->second.c_str ());
+ }
}
- virtual bool make_sockets() = 0;
-
- bool set_up_connections()
- {
- for (vsock_dets_t::const_iterator i = in.begin() ; i != in.end();
- ++i) {
-
- switch (i->first)
- {
- case device_cfg_t::connect :
- in_socket->connect(i->second.c_str());
- break;
- case device_cfg_t::bind :
- in_socket->bind(i->second.c_str());
- }
-
+ for (vsock_dets_t::const_iterator i = out.begin (); i != out.end ();
+ ++i) {
+ switch (i->first)
+ {
+ case device_cfg_t::connect:
+ out_socket->connect (i->second.c_str ());
+ break;
+ case device_cfg_t::bind:
+ out_socket->bind (i->second.c_str ());
}
- for (vsock_dets_t::const_iterator i = out.begin() ; i != out.end();
- ++i) {
-
- switch (i->first)
- {
- case device_cfg_t::connect :
- out_socket->connect(i->second.c_str());
- break;
- case device_cfg_t::bind :
- out_socket->bind(i->second.c_str());
- }
-
- }
- return true;
}
+ return true;
+ }
- void run()
- {
- zmq::device(device_type, *in_socket, *out_socket);
- }
+ void run()
+ {
+ zmq::device(device_type, *in_socket, *out_socket);
+ }
- protected:
+protected:
- bool make_sockets(int in_type, int out_type)
- {
- in_socket = new (std::nothrow) zmq::socket_t(*context, in_type);
- if (!in_socket)
- return false;
- out_socket = new (std::nothrow) zmq::socket_t(*context, out_type);
- if (!out_socket) {
- return false;
- }
- return true;
+ bool make_sockets (int in_type, int out_type)
+ {
+ in_socket = new (std::nothrow) zmq::socket_t (*context, in_type);
+ if (!in_socket)
+ return false;
+ out_socket = new (std::nothrow) zmq::socket_t (*context, out_type);
+ if (!out_socket) {
+ return false;
}
+ return true;
+ }
- int process_node(XMLNode& target_, bool in_,
- device_cfg_t::endpoint_direction ept_)
- {
-
- const char * name =
- (ept_ == device_cfg_t::connect) ? "connect" : "bind";
- int n = 0;
- while (true) {
- XMLNode connect = target_.getChildNode (name, n);
- if (connect.isEmpty ())
- break;
- const char *addr = connect.getAttribute ("addr");
- if (!addr) {
- fprintf (stderr, "'%s' node is missing 'addr' attribute\n",
- name);
- return 0;
- }
-
- if (in_)
- in.push_back( sock_details_t(ept_, addr));
- else
- out.push_back( sock_details_t(ept_, addr));
-
- n++;
+ int process_node (XMLNode& target_, bool in_,
+ device_cfg_t::endpoint_direction ept_)
+ {
+ const char *name =
+ (ept_ == device_cfg_t::connect) ? "connect" : "bind";
+ int n = 0;
+ while (true) {
+ XMLNode connect = target_.getChildNode (name, n);
+ if (connect.isEmpty ())
+ break;
+ const char *addr = connect.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'%s' node is missing 'addr' attribute\n",
+ name);
+ return 0;
}
- return 1;
+ if (in_)
+ in.push_back (sock_details_t (ept_, addr));
+ else
+ out.push_back (sock_details_t (ept_, addr));
+
+ n++;
}
+ return 1;
+ }
+
- protected:
+protected:
- int device_type;
- zmq::context_t* context;
- vsock_dets_t in;
- vsock_dets_t out;
- zmq::socket_t* in_socket;
- zmq::socket_t* out_socket;
+ int device_type;
+ zmq::context_t *context;
+ vsock_dets_t in;
+ vsock_dets_t out;
+ zmq::socket_t *in_socket;
+ zmq::socket_t *out_socket;
- private:
- void operator = (device_cfg_t const &);
- device_cfg_t(device_cfg_t const &);
- };
+private:
+ device_cfg_t (device_cfg_t const&);
+ void operator = (device_cfg_t const&);
+};
+class queue_device_cfg_t : public device_cfg_t
+{
+public:
- class queue_device_cfg_t : public device_cfg_t
+ queue_device_cfg_t () :
+ device_cfg_t (ZMQ_QUEUE)
{
- public:
- queue_device_cfg_t()
- : device_cfg_t(ZMQ_QUEUE)
- {}
- virtual bool make_sockets(){
- return device_cfg_t::make_sockets(ZMQ_XREP, ZMQ_XREQ);
- }
- };
+ }
+
+ virtual bool make_sockets ()
+ {
+ return device_cfg_t::make_sockets (ZMQ_XREP, ZMQ_XREQ);
+ }
+};
- class streamer_device_cfg_t : public device_cfg_t
+class streamer_device_cfg_t : public device_cfg_t
+{
+public:
+
+ streamer_device_cfg_t () :
+ device_cfg_t (ZMQ_STREAMER)
{
- public:
- streamer_device_cfg_t()
- : device_cfg_t(ZMQ_STREAMER)
- {}
- virtual bool make_sockets () {
- return device_cfg_t::make_sockets(ZMQ_UPSTREAM, ZMQ_DOWNSTREAM);
- }
- };
+ }
- class forwarder_device_cfg_t : public device_cfg_t
+ virtual bool make_sockets ()
{
- public:
- forwarder_device_cfg_t()
- : device_cfg_t(ZMQ_FORWARDER)
- {}
- virtual bool make_sockets() {
- if (!device_cfg_t::make_sockets(ZMQ_SUB, ZMQ_PUB) ) {
- return false;
- }
- in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0);
- return true;
- }
- };
+ return device_cfg_t::make_sockets (ZMQ_UPSTREAM, ZMQ_DOWNSTREAM);
+ }
+};
+class forwarder_device_cfg_t : public device_cfg_t
+{
+public:
- device_cfg_t* make_device_config(XMLNode& device)
+ forwarder_device_cfg_t() :
+ device_cfg_t (ZMQ_FORWARDER)
{
- const char *dev_type = device.getAttribute ("type");
-
- if (!dev_type) {
- fprintf (stderr, "'device' node is missing 'type' attribute\n");
- return NULL;
- }
+ }
- if (strcmp (dev_type, "forwarder") == 0) {
- return new (std::nothrow) forwarder_device_cfg_t;
- }
- else if (strcmp (dev_type, "streamer") == 0) {
- return new (std::nothrow) streamer_device_cfg_t;
- }
- else if (strcmp (dev_type, "queue") == 0) {
- return new (std::nothrow) queue_device_cfg_t;
+ virtual bool make_sockets()
+ {
+ if (!device_cfg_t::make_sockets (ZMQ_SUB, ZMQ_PUB) ) {
+ return false;
}
-
- fprintf (stderr, "type attribute in the device configuration file "
- "should be named 'forwarder', 'streamer' or 'queue'\n");
-
- return NULL;
+ in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0);
+ return true;
}
+};
- extern "C" void* worker_function(void *arg)
- {
+device_cfg_t *make_device_config (XMLNode& device)
+{
+ const char *dev_type = device.getAttribute ("type");
- if (!arg) {
- fprintf (stderr, "arg is null, returning \n");
- return 0;
- }
+ if (!dev_type) {
+ fprintf (stderr, "'device' node is missing 'type' attribute\n");
+ return NULL;
+ }
- std::auto_ptr<device_cfg_t> cfg ( (device_cfg_t*) arg );
+ if (strcmp (dev_type, "forwarder") == 0) {
+ return new (std::nothrow) forwarder_device_cfg_t;
+ }
+ else if (strcmp (dev_type, "streamer") == 0) {
+ return new (std::nothrow) streamer_device_cfg_t;
+ }
+ else if (strcmp (dev_type, "queue") == 0) {
+ return new (std::nothrow) queue_device_cfg_t;
+ }
+
+ fprintf (stderr, "type attribute in the device configuration file "
+ "should be named 'forwarder', 'streamer' or 'queue'\n");
- zmq::context_t* ctx = cfg->get_context();
+ return NULL;
+}
- if (!ctx) {
- fprintf (stderr, "no context, returning \n");
- return 0;
- }
- if (! cfg->make_sockets()) {
- fprintf (stderr, "failed to make sockets, returning \n");
- return 0;
- }
+extern "C" void* worker_function(void *arg)
+{
+ if (!arg) {
+ fprintf (stderr, "arg is null, returning \n");
+ return 0;
+ }
+ std::auto_ptr <device_cfg_t> cfg ((device_cfg_t*) arg);
- if (! cfg->set_up_connections()) {
- fprintf (stderr, "failed to set up connections, returning \n");
- return 0;
- }
+ zmq::context_t *ctx = cfg->get_context ();
- cfg->run();
+ if (!ctx) {
+ fprintf (stderr, "no context, returning \n");
+ return 0;
+ }
+ if (!cfg->make_sockets ()) {
+ fprintf (stderr, "failed to make sockets, returning \n");
return 0;
+ }
+
+ if (!cfg->set_up_connections ()) {
+ fprintf (stderr, "failed to set up connections, returning \n");
+ return 0;
}
+ cfg->run();
+ return 0;
}
-
int main (int argc, char *argv [])
{
if (argc != 2) {
@@ -309,8 +311,7 @@ int main (int argc, char *argv [])
return 1;
}
-
- std::vector<device_cfg_t*> vdev;
+ std::vector <device_cfg_t*> vdev;
while (true) {
@@ -319,49 +320,44 @@ int main (int argc, char *argv [])
if (device.isEmpty())
break;
- device_cfg_t* dev = make_device_config(device);
+ device_cfg_t* dev = make_device_config (device);
if (!dev) {
fprintf(stderr, "failed to create device config\n");
return 1;
}
- if (! dev->init(device) ) {
-
+ if (!dev->init (device)) {
fprintf(stderr,"error with initialising device configuration\n");
delete dev;
return 1;
}
- vdev.push_back(dev);
+ vdev.push_back (dev);
}
- std::vector<device_cfg_t*>::size_type num_devices = vdev.size();
+ std::vector <device_cfg_t*>::size_type num_devices = vdev.size ();
- if ( num_devices == 0 ) {
+ if (num_devices == 0) {
fprintf(stderr,"no devices in the config file\n");
return 1;
}
-
- zmq::context_t ctx (num_devices,1);
-
+ zmq::context_t ctx (num_devices, 1);
for (unsigned int i = 0 ; i < num_devices ; ++i) {
- vdev[i]->set_context(&ctx);
+ vdev [i]->set_context (&ctx);
if (i) {
pthread_t worker;
int rc = pthread_create (&worker, NULL, &worker_function,
- (void*) vdev[i]);
+ (void*) vdev [i]);
assert (rc == 0);
}
}
-
- worker_function((void*)vdev[0]);
-
+ worker_function ((void*) vdev [0]);
return 0;
}