diff options
Diffstat (limited to 'zmqd')
| -rw-r--r-- | zmqd/Makefile.am | 8 | ||||
| -rw-r--r-- | zmqd/zmqd.cpp | 368 | 
2 files changed, 376 insertions, 0 deletions
diff --git a/zmqd/Makefile.am b/zmqd/Makefile.am new file mode 100644 index 0000000..32a657b --- /dev/null +++ b/zmqd/Makefile.am @@ -0,0 +1,8 @@ +INCLUDES = -I$(top_srcdir)/include + +bin_PROGRAMS = zmqd + +zmqd_LDADD = $(top_builddir)/src/libzmq.la +zmqd_SOURCES = zmqd.cpp + + diff --git a/zmqd/zmqd.cpp b/zmqd/zmqd.cpp new file mode 100644 index 0000000..5b27ac2 --- /dev/null +++ b/zmqd/zmqd.cpp @@ -0,0 +1,368 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <vector> +#include <string> +#include <memory> + +#include "../include/zmq.hpp" +#include "../foreign/xmlParser/xmlParser.cpp" + +namespace { + +    class device_cfg_t +    { + +        enum endpoint_direction {connect, bind}; + +        typedef std::pair<endpoint_direction, std::string> sock_details_t; + +        typedef std::vector<sock_details_t> vsock_dets_t; + +    public: + +        explicit device_cfg_t(int type) +                : device_type(type) , context(0)  +                , in_socket(0), out_socket(0) +        { +        } + +        virtual ~device_cfg_t() +        { +            delete out_socket; +            delete in_socket; +        } + +        bool  init(XMLNode& device) +        { + +            XMLNode in_node = device.getChildNode ("in"); +            if (in_node.isEmpty ()) { +                fprintf (stderr, "'in' node is missing in the configuration file\n"); +                return false; +            } + +            XMLNode out_node = device.getChildNode ("out"); +            if (out_node.isEmpty ()) { +                fprintf (stderr, "'out' node is missing in the configuration file\n"); +                return false; +            } + +            if (!process_node(in_node,true,device_cfg_t::bind)) +                return false; +            if (!process_node(in_node,true,device_cfg_t::connect)) +                return false; +            if (!process_node(out_node,false,device_cfg_t::bind)) +                return false; +            if (!process_node(out_node,false,device_cfg_t::connect)) +                return false; + +            return true; +        } + +        void set_context(zmq::context_t* context_) +        { +            context = context_; +        } + +        zmq::context_t *get_context() const +        { +            return context; +        } + +        virtual bool make_sockets() = 0; + +        bool set_up_connections() +        { +            for (vsock_dets_t::const_iterator i = in.begin() ; i != in.end(); +                  ++i) { + +                switch (i->first) +                { +                    case device_cfg_t::connect : +                        in_socket->connect(i->second.c_str()); +                        break; +                    case device_cfg_t::bind : +                        in_socket->bind(i->second.c_str()); +                } +             +            } + +            for (vsock_dets_t::const_iterator i = out.begin() ; i != out.end(); +                  ++i) { + +                switch (i->first) +                { +                    case device_cfg_t::connect : +                        out_socket->connect(i->second.c_str()); +                        break; +                    case device_cfg_t::bind : +                        out_socket->bind(i->second.c_str()); +                } + +            } +            return true; +        } + +        void run()  +        { +            zmq::device(device_type, *in_socket, *out_socket); +        } + + +    protected: + +        bool make_sockets(int in_type, int out_type) +        { +            in_socket = new (std::nothrow) zmq::socket_t(*context, in_type); +            if (!in_socket)  +                return false; +            out_socket = new (std::nothrow) zmq::socket_t(*context, out_type); +            if (!out_socket) { +                return false; +            } +            return true; +        } + +        int process_node(XMLNode& target_, bool in_, +            device_cfg_t::endpoint_direction ept_) +        { + +            const char * name = +                (ept_ == device_cfg_t::connect) ? "connect" : "bind"; +            int n = 0; +            while (true) { +                XMLNode connect = target_.getChildNode (name, n); +                if (connect.isEmpty ()) +                    break; +                const char *addr = connect.getAttribute ("addr"); +                if (!addr) { +                    fprintf (stderr, "'%s' node is missing 'addr' attribute\n", +                        name); +                    return 0; +                } + +                if (in_)  +                    in.push_back( sock_details_t(ept_, addr)); +                else +                    out.push_back( sock_details_t(ept_, addr)); + +                n++; +            } + +            return 1; +        } + + +    protected: + +        int device_type; +        zmq::context_t* context; +        vsock_dets_t in; +        vsock_dets_t out; +        zmq::socket_t* in_socket; +        zmq::socket_t* out_socket; + +    private: +        void operator = (device_cfg_t const &); +        device_cfg_t(device_cfg_t const &); +    }; + + + +    class queue_device_cfg_t : public device_cfg_t +    { +    public: +        queue_device_cfg_t() +                : device_cfg_t(ZMQ_QUEUE) +        {} +        virtual bool make_sockets(){ +            return device_cfg_t::make_sockets(ZMQ_XREP, ZMQ_XREQ); +        } +    }; + + +    class streamer_device_cfg_t : public device_cfg_t +    { +    public: +        streamer_device_cfg_t() +                : device_cfg_t(ZMQ_STREAMER) +        {} +        virtual bool make_sockets () { +            return device_cfg_t::make_sockets(ZMQ_UPSTREAM, ZMQ_DOWNSTREAM); +        } +    }; + +    class forwarder_device_cfg_t : public device_cfg_t +    { +    public: +        forwarder_device_cfg_t() +                : device_cfg_t(ZMQ_FORWARDER) +        {} +        virtual bool make_sockets() { +            if (!device_cfg_t::make_sockets(ZMQ_SUB, ZMQ_PUB) ) { +                return false; +            } +            in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0); +            return true; +        } +    }; + + +    device_cfg_t* make_device_config(XMLNode& device) +    { +        const char *dev_type = device.getAttribute ("type"); + +        if (!dev_type) { +            fprintf (stderr, "'device' node is missing 'type' attribute\n"); +            return NULL; +        } + +        if (strcmp (dev_type, "forwarder") == 0) { +            return new (std::nothrow) forwarder_device_cfg_t; +        } +        else if (strcmp (dev_type, "streamer") == 0) { +            return new (std::nothrow) streamer_device_cfg_t; +        } +        else if (strcmp (dev_type, "queue") == 0) { +            return new (std::nothrow) queue_device_cfg_t; +        } +         +        fprintf (stderr, "type attribute in the device configuration file " +                 "should be named 'forwarder', 'streamer' or 'queue'\n"); + +        return NULL; +    } + + +    extern "C" void* worker_function(void *arg) +    { + +        if (!arg) { +            fprintf (stderr, "arg is null, returning \n"); +            return 0; +        } + +        std::auto_ptr<device_cfg_t> cfg ( (device_cfg_t*) arg ); + +        zmq::context_t* ctx = cfg->get_context(); + +        if (!ctx) { +            fprintf (stderr, "no context, returning \n"); +            return 0; +        } + +        if (! cfg->make_sockets()) { +            fprintf (stderr, "failed to make sockets, returning \n"); +            return 0; +        } + + +        if (! cfg->set_up_connections()) { +            fprintf (stderr, "failed to set up connections, returning \n"); +            return 0;          +        } + +        cfg->run(); + +        return 0; + +    } + + +} + + +int main (int argc, char *argv []) +{ +    if (argc != 2) { +        fprintf (stderr, "usage: zmqd <config-file>\n"); +        return 1; +    } + +    XMLNode root = XMLNode::parseFile (argv [1]); + +    if (root.isEmpty ()) { +        fprintf (stderr, "configuration file not found or not an XML file\n"); +        return 1; +    } + +    if (strcmp (root.getName (), "config") != 0) { +        fprintf (stderr, "root element in the configuration file should be " +            "named 'config'\n"); +        return 1; +    } + + +    std::vector<device_cfg_t*> vdev; + +    while (true) { + +        XMLNode device = root.getChildNode ("device", vdev.size()); + +        if (device.isEmpty()) +            break; + +        device_cfg_t* dev = make_device_config(device); + +        if (!dev) { +            fprintf(stderr, "failed to create device config\n"); +            return 1; +        } +         +        if (! dev->init(device) ) { + +            fprintf(stderr,"error with initialising device configuration\n"); +            delete dev; +            return 1; +        } +     +        vdev.push_back(dev); +    } + +    std::vector<device_cfg_t*>::size_type num_devices = vdev.size(); + +    if ( num_devices == 0 ) { +        fprintf(stderr,"no devices in the config file\n"); +        return 1; +    } + + +    zmq::context_t ctx (num_devices,1); + + +    for (unsigned int i = 0 ; i < num_devices ; ++i) { + +        vdev[i]->set_context(&ctx); +         +        if (i)  { +            pthread_t worker; +            int rc = pthread_create (&worker, NULL, &worker_function, +                (void*) vdev[i]); +            assert (rc == 0); +        } +    } + + +    worker_function((void*)vdev[0]); + + +    return 0; +} +  | 
