diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 8 | ||||
| -rw-r--r-- | src/app_thread.cpp | 20 | ||||
| -rw-r--r-- | src/app_thread.hpp | 3 | ||||
| -rw-r--r-- | src/i_endpoint.hpp | 7 | ||||
| -rw-r--r-- | src/options.cpp | 80 | ||||
| -rw-r--r-- | src/options.hpp | 3 | ||||
| -rw-r--r-- | src/p2p.cpp | 92 | ||||
| -rw-r--r-- | src/p2p.hpp | 56 | ||||
| -rw-r--r-- | src/pipe.cpp | 28 | ||||
| -rw-r--r-- | src/pipe.hpp | 19 | ||||
| -rw-r--r-- | src/pub.cpp | 129 | ||||
| -rw-r--r-- | src/pub.hpp | 24 | ||||
| -rw-r--r-- | src/rep.cpp | 204 | ||||
| -rw-r--r-- | src/rep.hpp | 79 | ||||
| -rw-r--r-- | src/req.cpp | 206 | ||||
| -rw-r--r-- | src/req.hpp | 84 | ||||
| -rw-r--r-- | src/session.cpp | 49 | ||||
| -rw-r--r-- | src/session.hpp | 6 | ||||
| -rw-r--r-- | src/socket_base.cpp | 460 | ||||
| -rw-r--r-- | src/socket_base.hpp | 77 | ||||
| -rw-r--r-- | src/sub.cpp | 88 | ||||
| -rw-r--r-- | src/sub.hpp | 38 | ||||
| -rw-r--r-- | src/yarray.hpp | 110 | ||||
| -rw-r--r-- | src/yarray_item.hpp | 62 | 
24 files changed, 1460 insertions, 472 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 2701237..f75c3a1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -68,7 +68,10 @@ libzmq_la_SOURCES = $(pgm_sources) \      pipe.hpp \      platform.hpp \      poll.hpp \ +    p2p.hpp \      pub.hpp \ +    rep.hpp \ +    req.hpp \      select.hpp \      session.hpp \      simple_semaphore.hpp \ @@ -82,6 +85,8 @@ libzmq_la_SOURCES = $(pgm_sources) \      uuid.hpp \      windows.hpp \      wire.hpp \ +    yarray.hpp \ +    yarray_item.hpp \      ypipe.hpp \      ypollset.hpp \      yqueue.hpp \ @@ -108,9 +113,12 @@ libzmq_la_SOURCES = $(pgm_sources) \      pgm_receiver.cpp \      pgm_sender.cpp \      pgm_socket.cpp \ +    p2p.cpp \      pipe.cpp \      poll.cpp \      pub.cpp \ +    rep.cpp \ +    req.cpp \      select.cpp \      session.cpp \      socket_base.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 303c6a1..d12b126 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -39,6 +39,9 @@  #include "socket_base.hpp"  #include "pub.hpp"  #include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" +#include "p2p.hpp"  //  If the RDTSC is available we use it to prevent excessive  //  polling for commands. The nice thing here is that it will work on any @@ -158,26 +161,27 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)      case ZMQ_SUB:          s = new sub_t (this);          break; -    case ZMQ_P2P:      case ZMQ_REQ: +        s = new req_t (this); +        break;      case ZMQ_REP: -        s = new socket_base_t (this, type_); +        s = new rep_t (this); +        break; +    case ZMQ_P2P: +        s = new p2p_t (this);          break;      default:          //  TODO: This should be EINVAL.          zmq_assert (false);      }      zmq_assert (s); -    s->set_index (sockets.size ()); +      sockets.push_back (s); +      return s;  }  void zmq::app_thread_t::remove_socket (socket_base_t *socket_)  { -    int i = socket_->get_index (); -    socket_->set_index (-1); -    sockets [i] = sockets.back (); -    sockets [i]->set_index (i); -    sockets.pop_back (); +    sockets.erase (socket_);  } diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 4fe67fb..14cb8c5 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -24,6 +24,7 @@  #include "stdint.hpp"  #include "object.hpp" +#include "yarray.hpp"  #include "thread.hpp"  namespace zmq @@ -67,7 +68,7 @@ namespace zmq      private:          //  All the sockets created from this application thread. -        typedef std::vector <class socket_base_t*> sockets_t; +        typedef yarray_t <socket_base_t> sockets_t;          sockets_t sockets;          //  If false, app_thread_t object is not associated with any OS thread. diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index 8ee2984..3bab2a5 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -25,11 +25,12 @@ namespace zmq      struct i_endpoint      { -        virtual void attach_inpipe (class reader_t *pipe_) = 0; -        virtual void attach_outpipe (class writer_t *pipe_) = 0; -        virtual void revive (class reader_t *pipe_) = 0; +        virtual void attach_pipes (class reader_t *inpipe_, +            class writer_t *outpipe_) = 0;          virtual void detach_inpipe (class reader_t *pipe_) = 0;          virtual void detach_outpipe (class writer_t *pipe_) = 0; +        virtual void kill (class reader_t *pipe_) = 0; +        virtual void revive (class reader_t *pipe_) = 0;      };  } diff --git a/src/options.cpp b/src/options.cpp index 55417f5..b0e6e6e 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -17,7 +17,10 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include "../bindings/c/zmq.h" +  #include "options.hpp" +#include "err.hpp"  zmq::options_t::options_t () :      hwm (0), @@ -29,3 +32,80 @@ zmq::options_t::options_t () :      use_multicast_loop (false)  {  } + +int zmq::options_t::setsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    switch (option_) { + +    case ZMQ_HWM: +        if (optvallen_ != sizeof (int64_t)) { +            errno = EINVAL; +            return -1; +        } +        hwm = *((int64_t*) optval_); +        return 0; + +    case ZMQ_LWM: +        if (optvallen_ != sizeof (int64_t)) { +            errno = EINVAL; +            return -1; +        } +        lwm = *((int64_t*) optval_); +        return 0; + +    case ZMQ_SWAP: +        if (optvallen_ != sizeof (int64_t)) { +            errno = EINVAL; +            return -1; +        } +        swap = *((int64_t*) optval_); +        return 0; + +    case ZMQ_AFFINITY: +        if (optvallen_ != sizeof (int64_t)) { +            errno = EINVAL; +            return -1; +        } +        affinity = (uint64_t) *((int64_t*) optval_); +        return 0; + +    case ZMQ_IDENTITY: +        identity.assign ((const char*) optval_, optvallen_); +        return 0; + +    case ZMQ_RATE: +        if (optvallen_ != sizeof (int64_t)) { +            errno = EINVAL; +            return -1; +        } +        rate = (uint32_t) *((int64_t*) optval_); +        return 0; +         +    case ZMQ_RECOVERY_IVL: +        if (optvallen_ != sizeof (int64_t)) { +            errno = EINVAL; +            return -1; +        } +        recovery_ivl = (uint32_t) *((int64_t*) optval_); +        return 0; + +    case ZMQ_MCAST_LOOP: +        if (optvallen_ != sizeof (int64_t)) { +            errno = EINVAL; +            return -1; +        } +        if ((int64_t) *((int64_t*) optval_) == 0) +            use_multicast_loop = false; +        else if ((int64_t) *((int64_t*) optval_) == 1) +            use_multicast_loop = true; +        else { +            errno = EINVAL; +            return -1; +        } +        return 0; +    } + +    errno = EINVAL; +    return -1; +} diff --git a/src/options.hpp b/src/options.hpp index c1ecb57..cde144c 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -22,6 +22,7 @@  #include <string> +#include "stddef.h"  #include "stdint.hpp"  namespace zmq @@ -31,6 +32,8 @@ namespace zmq      {          options_t (); +        int setsockopt (int option_, const void *optval_, size_t optvallen_); +          int64_t hwm;          int64_t lwm;          int64_t swap; diff --git a/src/p2p.cpp b/src/p2p.cpp new file mode 100644 index 0000000..537f3ce --- /dev/null +++ b/src/p2p.cpp @@ -0,0 +1,92 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "p2p.hpp" +#include "err.hpp" + +zmq::p2p_t::p2p_t (class app_thread_t *parent_) : +    socket_base_t (parent_, ZMQ_P2P) +{ +} + +zmq::p2p_t::~p2p_t () +{ +} + +bool zmq::p2p_t::xrequires_in () +{ +    return true; +} + +bool zmq::p2p_t::xrequires_out () +{ +    return true; +} + +void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, +    class writer_t *outpipe_) +{ +    zmq_assert (false); +} + +void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_) +{ +    zmq_assert (false); +} + +void zmq::p2p_t::xdetach_outpipe (class writer_t *pipe_) +{ +    zmq_assert (false); +} + +void zmq::p2p_t::xkill (class reader_t *pipe_) +{ +    zmq_assert (false); +} + +void zmq::p2p_t::xrevive (class reader_t *pipe_) +{ +    zmq_assert (false); +} + +int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    errno = EINVAL; +    return -1; +} + +int zmq::p2p_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ +    zmq_assert (false); +} + +int zmq::p2p_t::xflush () +{ +    zmq_assert (false); +} + +int zmq::p2p_t::xrecv (struct zmq_msg_t *msg_, int flags_) +{ +    zmq_assert (false); +} + + diff --git a/src/p2p.hpp b/src/p2p.hpp new file mode 100644 index 0000000..84790a1 --- /dev/null +++ b/src/p2p.hpp @@ -0,0 +1,56 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_P2P_INCLUDED__ +#define __ZMQ_P2P_INCLUDED__ + +#include "socket_base.hpp" + +namespace zmq +{ + +    class p2p_t : public socket_base_t +    { +    public: + +        p2p_t (class app_thread_t *parent_); +        ~p2p_t (); + +        //  Overloads of functions from socket_base_t. +        bool xrequires_in (); +        bool xrequires_out (); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xdetach_inpipe (class reader_t *pipe_); +        void xdetach_outpipe (class writer_t *pipe_); +        void xkill (class reader_t *pipe_); +        void xrevive (class reader_t *pipe_); +        int xsetsockopt (int option_, const void *optval_, size_t optvallen_); +        int xsend (struct zmq_msg_t *msg_, int flags_); +        int xflush (); +        int xrecv (struct zmq_msg_t *msg_, int flags_); + +    private: + +        p2p_t (const p2p_t&); +        void operator = (const p2p_t&); +    }; + +} + +#endif diff --git a/src/pipe.cpp b/src/pipe.cpp index f4cf0c4..9f70586 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -28,7 +28,6 @@ zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,      peer (&pipe_->writer),      hwm (hwm_),      lwm (lwm_), -    index (-1),      endpoint (NULL)  {  } @@ -39,8 +38,10 @@ zmq::reader_t::~reader_t ()  bool zmq::reader_t::read (zmq_msg_t *msg_)  { -    if (!pipe->read (msg_)) +    if (!pipe->read (msg_)) { +        endpoint->kill (this);          return false; +    }      //  If delimiter was read, start termination process of the pipe.      unsigned char *offset = 0; @@ -61,17 +62,6 @@ void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)      endpoint = endpoint_;  } -void zmq::reader_t::set_index (int index_) -{ -    index = index_; -} - -int zmq::reader_t::get_index () -{ -    zmq_assert (index != -1); -    return index; -} -  void zmq::reader_t::term ()  {      endpoint = NULL; @@ -96,7 +86,6 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,      peer (&pipe_->reader),      hwm (hwm_),      lwm (lwm_), -    index (-1),      endpoint (NULL)  {  } @@ -106,17 +95,6 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)      endpoint = endpoint_;  } -void zmq::writer_t::set_index (int index_) -{ -    index = index_; -} - -int zmq::writer_t::get_index () -{ -    zmq_assert (index != -1); -    return index; -} -  zmq::writer_t::~writer_t ()  {  } diff --git a/src/pipe.hpp b/src/pipe.hpp index ede73b8..177b1b4 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -24,6 +24,7 @@  #include "stdint.hpp"  #include "i_endpoint.hpp" +#include "yarray_item.hpp"  #include "ypipe.hpp"  #include "config.hpp"  #include "object.hpp" @@ -31,7 +32,7 @@  namespace zmq  { -    class reader_t : public object_t +    class reader_t : public object_t, public yarray_item_t      {      public: @@ -44,10 +45,6 @@ namespace zmq          //  Reads a message to the underlying pipe.          bool read (struct zmq_msg_t *msg_); -        //  Mnaipulation of index of the pipe. -        void set_index (int index_); -        int get_index (); -          //  Ask pipe to terminate.          void term (); @@ -72,9 +69,6 @@ namespace zmq          uint64_t tail;          uint64_t last_sent_head; -        //  Index of the pipe in the socket's list of inbound pipes. -        int index; -          //  Endpoint (either session or socket) the pipe is attached to.          i_endpoint *endpoint; @@ -82,7 +76,7 @@ namespace zmq          void operator = (const reader_t&);      }; -    class writer_t : public object_t +    class writer_t : public object_t, public yarray_item_t      {      public: @@ -104,10 +98,6 @@ namespace zmq          //  Flush the messages downsteam.          void flush (); -        //  Mnaipulation of index of the pipe. -        void set_index (int index_); -        int get_index (); -          //  Ask pipe to terminate.          void term (); @@ -130,9 +120,6 @@ namespace zmq          uint64_t head;          uint64_t tail; -        //  Index of the pipe in the socket's list of outbound pipes. -        int index; -          //  Endpoint (either session or socket) the pipe is attached to.          i_endpoint *endpoint; diff --git a/src/pub.cpp b/src/pub.cpp index ca8afae..020d789 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -21,6 +21,8 @@  #include "pub.hpp"  #include "err.hpp" +#include "msg_content.hpp" +#include "pipe.hpp"  zmq::pub_t::pub_t (class app_thread_t *parent_) :      socket_base_t (parent_, ZMQ_PUB) @@ -29,9 +31,134 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) :  zmq::pub_t::~pub_t ()  { +    for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++) +        out_pipes [i]->term (); +    out_pipes.clear ();  } -int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_) +bool zmq::pub_t::xrequires_in () +{ +    return false; +} + +bool zmq::pub_t::xrequires_out () +{ +    return true; +} + +void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, +    class writer_t *outpipe_) +{ +    zmq_assert (!inpipe_); +    out_pipes.push_back (outpipe_); +} + +void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_) +{ +    zmq_assert (false); +} + +void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_) +{ +    out_pipes.erase (pipe_); +} + +void zmq::pub_t::xkill (class reader_t *pipe_) +{ +    zmq_assert (false); +} + +void zmq::pub_t::xrevive (class reader_t *pipe_) +{ +    zmq_assert (false); +} + +int zmq::pub_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    errno = EINVAL; +    return -1; +} + +int zmq::pub_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ +    out_pipes_t::size_type pipes_count = out_pipes.size (); + +    //  If there are no pipes available, simply drop the message. +    if (pipes_count == 0) { +        int rc = zmq_msg_close (msg_); +        zmq_assert (rc == 0); +        rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0); +        return 0; +    } + +    //  First check whether all pipes are available for writing. +    for (out_pipes_t::size_type i = 0; i != pipes_count; i++) +        if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) { +            errno = EAGAIN; +            return -1; +        } + +    msg_content_t *content = (msg_content_t*) msg_->content; + +    //  For VSMs the copying is straighforward. +    if (content == (msg_content_t*) ZMQ_VSM) { +        for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { +            out_pipes [i]->write (msg_); +            if (!(flags_ & ZMQ_NOFLUSH)) +                out_pipes [i]->flush (); +        } +        int rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0); +        return 0; +    } + +    //  Optimisation for the case when there's only a single pipe +    //  to send the message to - no refcount adjustment i.e. no atomic +    //  operations are needed. +    if (pipes_count == 1) { +        out_pipes [0]->write (msg_); +        if (!(flags_ & ZMQ_NOFLUSH)) +            out_pipes [0]->flush (); +        int rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0); +        return 0; +    } + +    //  There are at least 2 destinations for the message. That means we have +    //  to deal with reference counting. First add N-1 references to +    //  the content (we are holding one reference anyway, that's why -1). +    if (msg_->shared) +        content->refcnt.add (pipes_count - 1); +    else { +        content->refcnt.set (pipes_count); +        msg_->shared = true; +    } + +    //  Push the message to all destinations. +    for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { +        out_pipes [i]->write (msg_); +        if (!(flags_ & ZMQ_NOFLUSH)) +            out_pipes [i]->flush (); +    } + +    //  Detach the original message from the data buffer. +    int rc = zmq_msg_init (msg_); +    zmq_assert (rc == 0); + +    return 0; +} + +int zmq::pub_t::xflush () +{ +    out_pipes_t::size_type pipe_count = out_pipes.size (); +    for (out_pipes_t::size_type i = 0; i != pipe_count; i++) +        out_pipes [i]->flush (); +    return 0; +} + +int zmq::pub_t::xrecv (struct zmq_msg_t *msg_, int flags_)  {      errno = EFAULT;      return -1; diff --git a/src/pub.hpp b/src/pub.hpp index 2f03b8e..8255c6f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -21,6 +21,7 @@  #define __ZMQ_PUB_INCLUDED__  #include "socket_base.hpp" +#include "yarray.hpp"  namespace zmq  { @@ -32,8 +33,27 @@ namespace zmq          pub_t (class app_thread_t *parent_);          ~pub_t (); -        //  Overloads of API functions from socket_base_t. -        int recv (struct zmq_msg_t *msg_, int flags_); +        //  Overloads of functions from socket_base_t. +        bool xrequires_in (); +        bool xrequires_out (); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xdetach_inpipe (class reader_t *pipe_); +        void xdetach_outpipe (class writer_t *pipe_); +        void xkill (class reader_t *pipe_); +        void xrevive (class reader_t *pipe_); +        int xsetsockopt (int option_, const void *optval_, size_t optvallen_); +        int xsend (struct zmq_msg_t *msg_, int flags_); +        int xflush (); +        int xrecv (struct zmq_msg_t *msg_, int flags_); + +    private: + +        //  Outbound pipes, i.e. those the socket is sending messages to. +        typedef yarray_t <class writer_t> out_pipes_t; +        out_pipes_t out_pipes; + +        pub_t (const pub_t&); +        void operator = (const pub_t&);      };  } diff --git a/src/rep.cpp b/src/rep.cpp new file mode 100644 index 0000000..2fbb66c --- /dev/null +++ b/src/rep.cpp @@ -0,0 +1,204 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "rep.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::rep_t::rep_t (class app_thread_t *parent_) : +    socket_base_t (parent_, ZMQ_REP), +    active (0), +    current (0), +    waiting_for_reply (false), +    reply_pipe (NULL) +{ +} + +zmq::rep_t::~rep_t () +{ +} + +bool zmq::rep_t::xrequires_in () +{ +    return true; +} + +bool zmq::rep_t::xrequires_out () +{ +    return true; +} + +void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, +    class writer_t *outpipe_) +{ +    zmq_assert (inpipe_ && outpipe_); +    zmq_assert (in_pipes.size () == out_pipes.size ()); + +    in_pipes.push_back (inpipe_); +    in_pipes.swap (active, in_pipes.size () - 1); +    out_pipes.push_back (outpipe_); +    out_pipes.swap (active, out_pipes.size () - 1); +    active++; +} + +void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) +{ +    zmq_assert (pipe_); +    zmq_assert (in_pipes.size () == out_pipes.size ()); + +    in_pipes_t::size_type index = in_pipes.index (pipe_); + +    //  If corresponding outpipe is still in place simply nullify the pointer +    //  to the inpipe and move it to the passive state. +    if (out_pipes [index]) { +        in_pipes [index] = NULL; +        if (in_pipes.index (pipe_) < active) { +            active--; +            in_pipes.swap (index, active); +            out_pipes.swap (index, active); +        } +        return; +    } + +    //  Now both inpipe and outpipe are detached. Remove them from the lists. +    if (in_pipes.index (pipe_) < active) +        active--; +    in_pipes.erase (index); +    out_pipes.erase (index); +} + +void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_) +{ +    zmq_assert (pipe_); +    zmq_assert (in_pipes.size () == out_pipes.size ()); + +    out_pipes_t::size_type index = out_pipes.index (pipe_); + +    //  TODO: If the connection we've got the request from disconnects, +    //  there's nowhere to send the reply. DLQ? +    if (waiting_for_reply && pipe_ == reply_pipe) { +        zmq_assert (false); +    } + +    //  If corresponding inpipe is still in place simply nullify the pointer +    //  to the outpipe. +    if (in_pipes [index]) { +        out_pipes [index] = NULL; +        if (out_pipes.index (pipe_) < active) { +            active--; +            in_pipes.swap (index, active); +            out_pipes.swap (index, active); +        } +        return; +    } + +    //  Now both inpipe and outpipe are detached. Remove them from the lists. +    if (out_pipes.index (pipe_) < active) +        active--; +    in_pipes.erase (index); +    out_pipes.erase (index); +} + +void zmq::rep_t::xkill (class reader_t *pipe_) +{ +    //  Move the pipe to the list of inactive pipes. +    in_pipes_t::size_type index = in_pipes.index (pipe_); +    active--; +    in_pipes.swap (index, active); +    out_pipes.swap (index, active); +} + +void zmq::rep_t::xrevive (class reader_t *pipe_) +{ +    //  Move the pipe to the list of active pipes. +    in_pipes_t::size_type index = in_pipes.index (pipe_); +    in_pipes.swap (index, active); +    out_pipes.swap (index, active); +    active++; +} + +int zmq::rep_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    errno = EINVAL; +    return -1; +} + +int zmq::rep_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ +    if (!waiting_for_reply) { +        errno = EFAULT; +        return -1; +    } + +    //  TODO: Implement this once queue limits are in-place. If the reply +    //  overloads the buffer, connection should be torn down. +    zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_))); + +    //  Push message to the selected pipe. +    reply_pipe->write (msg_); +    reply_pipe->flush (); + +    waiting_for_reply = false; +    reply_pipe = NULL; + +    //  Detach the message from the data buffer. +    int rc = zmq_msg_init (msg_); +    zmq_assert (rc == 0); +} + +int zmq::rep_t::xflush () +{ +    errno = EFAULT; +    return -1; +} + +int zmq::rep_t::xrecv (struct zmq_msg_t *msg_, int flags_) +{ +    //  Deallocate old content of the message. +    zmq_msg_close (msg_); + +    if (waiting_for_reply) { +        errno = EFAULT; +        return -1; +    } + +    //  Round-robin over the pipes to get next message. +    for (int count = active; count != 0; count--) { +        bool fetched = in_pipes [current]->read (msg_); +        current++; +        if (current >= active) +            current = 0; +        if (fetched) { +            reply_pipe = out_pipes [current]; +            waiting_for_reply = true; +            return 0; +        } +    } + +    //  No message is available. Initialise the output parameter +    //  to be a 0-byte message. +    zmq_msg_init (msg_); +    errno = EAGAIN; +    return -1; +} + + diff --git a/src/rep.hpp b/src/rep.hpp new file mode 100644 index 0000000..6e55f47 --- /dev/null +++ b/src/rep.hpp @@ -0,0 +1,79 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_REP_INCLUDED__ +#define __ZMQ_REP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + +    class rep_t : public socket_base_t +    { +    public: + +        rep_t (class app_thread_t *parent_); +        ~rep_t (); + +        //  Overloads of functions from socket_base_t. +        bool xrequires_in (); +        bool xrequires_out (); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xdetach_inpipe (class reader_t *pipe_); +        void xdetach_outpipe (class writer_t *pipe_); +        void xkill (class reader_t *pipe_); +        void xrevive (class reader_t *pipe_); +        int xsetsockopt (int option_, const void *optval_, size_t optvallen_); +        int xsend (struct zmq_msg_t *msg_, int flags_); +        int xflush (); +        int xrecv (struct zmq_msg_t *msg_, int flags_); + +    private: + +        //  List in outbound and inbound pipes. Note that the two lists are +        //  always in sync. I.e. outpipe with index N communicates with the +        //  same session as inpipe with index N. +        typedef yarray_t <class writer_t> out_pipes_t; +        out_pipes_t out_pipes; +        typedef yarray_t <class reader_t> in_pipes_t; +        in_pipes_t in_pipes; + +        //  Number of active inpipes. All the active inpipes are located at the +        //  beginning of the in_pipes array. +        in_pipes_t::size_type active; + +        //  Index of the next inbound pipe to read a request from. +        in_pipes_t::size_type current; + +        //  If true, request was already received and reply wasn't sent yet. +        bool waiting_for_reply; + +        //  Pipe we are going to send reply to. +        class writer_t *r | 
