diff options
Diffstat (limited to 'src/xrep.cpp')
| -rw-r--r-- | src/xrep.cpp | 139 | 
1 files changed, 84 insertions, 55 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp index f50e32e..a9e2cc9 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -23,14 +23,16 @@  #include "err.hpp"  #include "pipe.hpp" -zmq::xrep_t::xrep_t (class app_thread_t *parent_) : -    socket_base_t (parent_), +zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) : +    socket_base_t (parent_, slot_),      current_in (0),      prefetched (false),      more_in (false),      current_out (NULL), -    more_out (false) +    more_out (false), +    terminating (false)  { +    options.type = ZMQ_XREP;      options.requires_in = true;      options.requires_out = true; @@ -41,42 +43,76 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :  zmq::xrep_t::~xrep_t ()  { -    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++) -        it->reader->term (); -    for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); -          it++) -        it->second.writer->term (); +    zmq_assert (inpipes.empty ()); +    zmq_assert (outpipes.empty ()); +} + +void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, +    const blob_t &peer_identity_) +{ +    if (outpipe_) { + +        outpipe_->set_event_sink (this); + +        //  TODO: What if new connection has same peer identity as the old one? +        outpipe_t outpipe = {outpipe_, true}; +        bool ok = outpipes.insert (std::make_pair ( +            peer_identity_, outpipe)).second; +        zmq_assert (ok); + +        if (terminating) { +            register_term_acks (1); +            outpipe_->terminate ();         +        } +    } + +    if (inpipe_) { + +        inpipe_->set_event_sink (this); + +        inpipe_t inpipe = {inpipe_, peer_identity_, true}; +        inpipes.push_back (inpipe); + +        if (terminating) { +            register_term_acks (1); +            inpipe_->terminate (); +        } +    }  } -void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::xrep_t::process_term ()  { -    zmq_assert (inpipe_ && outpipe_); +    terminating = true; -    //  TODO: What if new connection has same peer identity as the old one? -    outpipe_t outpipe = {outpipe_, true}; -    bool ok = outpipes.insert (std::make_pair ( -        peer_identity_, outpipe)).second; -    zmq_assert (ok); +    register_term_acks (inpipes.size () + outpipes.size ()); + +    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); +          it++) +        it->reader->terminate (); +    for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); +          it++) +        it->second.writer->terminate (); -    inpipe_t inpipe = {inpipe_, peer_identity_, true}; -    inpipes.push_back (inpipe); +    socket_base_t::process_term ();  } -void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::xrep_t::terminated (reader_t *pipe_)  { -// TODO:!      for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();            it++) {          if (it->reader == pipe_) {              inpipes.erase (it); +            if (terminating) +                unregister_term_ack (); +            if (current_in >= inpipes.size ()) +                current_in = 0;              return;          }      }      zmq_assert (false);  } -void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) +void zmq::xrep_t::terminated (writer_t *pipe_)  {      for (outpipes_t::iterator it = outpipes.begin ();            it != outpipes.end (); ++it) { @@ -84,26 +120,19 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)              outpipes.erase (it);              if (pipe_ == current_out)                  current_out = NULL; +            if (terminating) +                unregister_term_ack ();              return;          }      }      zmq_assert (false);  } -void zmq::xrep_t::xkill (class reader_t *pipe_) +void zmq::xrep_t::delimited (reader_t *pipe_)  { -    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); -          it++) { -        if (it->reader == pipe_) { -            zmq_assert (it->active); -            it->active = false; -            return; -        } -    } -    zmq_assert (false);  } -void zmq::xrep_t::xrevive (class reader_t *pipe_) +void zmq::xrep_t::activated (reader_t *pipe_)  {      for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();            it++) { @@ -116,7 +145,7 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_)      zmq_assert (false);  } -void zmq::xrep_t::xrevive (class writer_t *pipe_) +void zmq::xrep_t::activated (writer_t *pipe_)  {      for (outpipes_t::iterator it = outpipes.begin ();            it != outpipes.end (); ++it) { @@ -129,13 +158,6 @@ void zmq::xrep_t::xrevive (class writer_t *pipe_)      zmq_assert (false);  } -int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, -    size_t optvallen_) -{ -    errno = EINVAL; -    return -1; -} -  int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)  {      //  If this is the first part of the message it's the identity of the @@ -144,23 +166,24 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)          zmq_assert (!current_out);          //  If we have malformed message (prefix with no subsequent message) -        //  then just silently drop the message. -        if ((msg_->flags & ZMQ_MSG_MORE) == 0) -            return 0; +        //  then just silently ignore it. +        if (msg_->flags & ZMQ_MSG_MORE) { -        more_out = true; +            more_out = true; -        //  Find the pipe associated with the identity stored in the prefix. -        //  If there's no such pipe just silently drop the message. -        blob_t identity ((unsigned char*) zmq_msg_data (msg_), -            zmq_msg_size (msg_)); -        outpipes_t::iterator it = outpipes.find (identity); -        if (it == outpipes.end ()) -            return 0; - -        //  Remember the outgoing pipe. -        current_out = it->second.writer; +            //  Find the pipe associated with the identity stored in the prefix. +            //  If there's no such pipe just silently ignore the message. +            blob_t identity ((unsigned char*) zmq_msg_data (msg_), +                zmq_msg_size (msg_)); +            outpipes_t::iterator it = outpipes.find (identity); +            if (it != outpipes.end ()) +                current_out = it->second.writer; +        } +        int rc = zmq_msg_close (msg_); +        zmq_assert (rc == 0); +        rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0);          return 0;      } @@ -233,7 +256,9 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)              return 0;          } -        //  If me don't have a message, move to next pipe. +        //  If me don't have a message, mark the pipe as passive and +        //  move to next pipe. +        inpipes [current_in].active = false;          current_in++;          if (current_in >= inpipes.size ())              current_in = 0; @@ -260,6 +285,10 @@ bool zmq::xrep_t::xhas_in ()          if (inpipes [current_in].active &&                inpipes [current_in].reader->check_read ())              return true; + +        //  If me don't have a message, mark the pipe as passive and +        //  move to next pipe. +        inpipes [current_in].active = false;          current_in++;          if (current_in >= inpipes.size ())              current_in = 0;  | 
