diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/fq.cpp | 7 | ||||
| -rw-r--r-- | src/pipe.cpp | 6 | ||||
| -rw-r--r-- | src/req.cpp | 243 | ||||
| -rw-r--r-- | src/req.hpp | 54 | 
4 files changed, 47 insertions, 263 deletions
@@ -46,6 +46,9 @@ void zmq::fq_t::attach (reader_t *pipe_)  void zmq::fq_t::terminated (reader_t *pipe_)  { +    //  TODO: This is a problem with session-initiated termination. It breaks +    //  message atomicity. However, for socket initiated termination it's +    //  just fine.      zmq_assert (!more || pipes [current] != pipe_);      //  Remove the pipe from the list; adjust number of active pipes @@ -87,6 +90,10 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)          //  Try to fetch new message. If we've already read part of the message          //  subsequent part should be immediately available.          bool fetched = pipes [current]->read (msg_); + +        //  Check the atomicity of the message. If we've already received the +        //  first part of the message we should get the remaining parts +        //  without blocking.          zmq_assert (!(more && !fetched));          //  Note that when message is not fetched, current pipe is killed and diff --git a/src/pipe.cpp b/src/pipe.cpp index 14fc753..5780635 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -269,9 +269,6 @@ void zmq::writer_t::terminate ()      if (terminating)          return; -    //  Rollback any unfinished messages. -    rollback (); -      if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag))          write_delimiter ();      else @@ -280,6 +277,9 @@ void zmq::writer_t::terminate ()  void zmq::writer_t::write_delimiter ()  { +    //  Rollback any unfinished messages. +    rollback (); +      //  Push delimiter into the pipe.      //  Trick the compiler to belive that the tag is a valid pointer.      zmq_msg_t msg; diff --git a/src/req.cpp b/src/req.cpp index b900961..74cce66 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -21,129 +21,16 @@  #include "req.hpp"  #include "err.hpp" -#include "pipe.hpp"  zmq::req_t::req_t (class ctx_t *parent_, uint32_t slot_) : -    socket_base_t (parent_, slot_), -    active (0), -    current (0), +    xreq_t (parent_, slot_),      receiving_reply (false), -    reply_pipe_active (false), -    more (false), -    reply_pipe (NULL) +    message_begins (true)  { -    options.requires_in = true; -    options.requires_out = true;  }  zmq::req_t::~req_t ()  { -    zmq_assert (in_pipes.empty ()); -    zmq_assert (out_pipes.empty ()); -} - -void zmq::req_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, -    const blob_t &peer_identity_) -{ -    zmq_assert (inpipe_ && outpipe_); -    zmq_assert (in_pipes.size () == out_pipes.size ()); - -    inpipe_->set_event_sink (this); -    in_pipes.push_back (inpipe_); -    in_pipes.swap (active, in_pipes.size () - 1); - -    outpipe_->set_event_sink (this); -    out_pipes.push_back (outpipe_); -    out_pipes.swap (active, out_pipes.size () - 1); - -    active++; -} - -void zmq::req_t::xterm_pipes () -{ -    for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) -        in_pipes [i]->terminate (); -    for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++) -        out_pipes [i]->terminate (); -} - -void zmq::req_t::terminated (reader_t *pipe_) -{ -    zmq_assert (!receiving_reply || !more || reply_pipe != pipe_); - -    zmq_assert (pipe_); -    zmq_assert (in_pipes.size () == out_pipes.size ()); - -    //  TODO: The pipe we are awaiting the reply from is detached. What now? -    if (receiving_reply && pipe_ == reply_pipe) { -        zmq_assert (false); -    } - -    in_pipes_t::size_type index = in_pipes.index (pipe_); - -    //  ??? -    if (!zombie) { -        if (out_pipes [index]) -            out_pipes [index]->terminate (); -        out_pipes.erase (index); -    } - -    in_pipes.erase (index); - -    if (index < active) { -        active--; -        if (current == active) -            current = 0; -    } -} - -void zmq::req_t::terminated (writer_t *pipe_) -{ -    zmq_assert (receiving_reply || !more || out_pipes [current] != pipe_); - -    zmq_assert (pipe_); -    zmq_assert (in_pipes.size () == out_pipes.size ()); - -    out_pipes_t::size_type index = out_pipes.index (pipe_); - -    //  ??? -    if (!zombie) { -        if (in_pipes [index]) -            in_pipes [index]->terminate (); -        in_pipes.erase (index); -    } - -    out_pipes.erase (index); -    if (index < active) { -        active--; -        if (current == active) -            current = 0; -    } -} - -bool zmq::req_t::xhas_pipes () -{ -    return !in_pipes.empty () || !out_pipes.empty (); -} - -void zmq::req_t::activated (reader_t *pipe_) -{ -    //  TODO: Actually, misbehaving peer can cause this kind of thing. -    //  Handle it decently, presumably kill the offending connection. -    zmq_assert (pipe_ == reply_pipe); -    reply_pipe_active = true; -} - -void zmq::req_t::activated (writer_t *pipe_) -{ -    out_pipes_t::size_type index = out_pipes.index (pipe_); -    zmq_assert (index >= active); - -    if (in_pipes [index] != NULL) { -        in_pipes.swap (index, active); -        out_pipes.swap (index, active); -        active++; -    }  }  int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) @@ -155,99 +42,58 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)          return -1;      } -    while (active > 0) { -        if (out_pipes [current]->check_write ()) -            break; - -        zmq_assert (!more); -        active--; -        if (current < active) { -            in_pipes.swap (current, active); -            out_pipes.swap (current, active); -        } -        else -            current = 0; -    } - -    if (active == 0) { -        errno = EAGAIN; -        return -1; -    } - -    //  If we are starting to send the request, generate a prefix. -    if (!more) { +    //  First part of the request is empty message part (stack bottom). +    if (message_begins) {          zmq_msg_t prefix;          int rc = zmq_msg_init (&prefix);          zmq_assert (rc == 0); -        prefix.flags |= ZMQ_MSG_MORE; -        bool written = out_pipes [current]->write (&prefix); -        zmq_assert (written); +        prefix.flags = ZMQ_MSG_MORE; +        rc = xreq_t::xsend (&prefix, flags_); +        if (rc != 0) +            return rc; +        message_begins = false;      } -    //  Push the message to the selected pipe. -    bool written = out_pipes [current]->write (msg_); -    zmq_assert (written); -    more = msg_->flags & ZMQ_MSG_MORE; -    if (!more) { -        out_pipes [current]->flush (); -        receiving_reply = true; -        reply_pipe = in_pipes [current]; +    bool more = msg_->flags & ZMQ_MSG_MORE; -        //  We can safely assume that the reply pipe is active as the last time -        //  we've used it we've read the reply and haven't tried to read from it -        //  anymore. -        reply_pipe_active = true; +    int rc = xreq_t::xsend (msg_, flags_); +    if (rc != 0) +        return rc; -        //  Move to the next pipe (load-balancing). -        current = (current + 1) % active; +    //  If the request was fully sent, flip the FSM into reply-receiving state. +    if (!more) { +        receiving_reply = true; +        message_begins = true;      } -    //  Detach the message from the data buffer. -    int rc = zmq_msg_init (msg_); -    zmq_assert (rc == 0); -      return 0;  }  int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)  { -    //  Deallocate old content of the message. -    int rc = zmq_msg_close (msg_); -    zmq_assert (rc == 0); -      //  If request wasn't send, we can't wait for reply.      if (!receiving_reply) { -        zmq_msg_init (msg_);          errno = EFSM;          return -1;      } -    //  Get the reply from the reply pipe. -    if (!reply_pipe_active || !reply_pipe->read (msg_)) { -        reply_pipe_active = false; -        zmq_msg_init (msg_); -        errno = EAGAIN; -        return -1; -    } - -    //  If we are starting to receive new reply, check whether prefix -    //  is well-formed and drop it. -    if (!more) { +    //  First part of the reply should be empty message part (stack bottom). +    if (message_begins) { +        int rc = xreq_t::xrecv (msg_, flags_); +        if (rc != 0) +            return rc;          zmq_assert (msg_->flags & ZMQ_MSG_MORE);          zmq_assert (zmq_msg_size (msg_) == 0); -        rc = zmq_msg_close (msg_); -        zmq_assert (rc == 0); - -        //  Get the actual reply. -        bool recvd = reply_pipe->read (msg_); -        zmq_assert (recvd);      } -    //  If this was last part of the reply, switch to request phase. -    more = msg_->flags & ZMQ_MSG_MORE; -    if (!more) { +    int rc = xreq_t::xrecv (msg_, flags_); +    if (rc != 0) +        return rc; + +    //  If the reply is fully received, flip the FSM into request-sending state. +    if (!(msg_->flags & ZMQ_MSG_MORE)) {          receiving_reply = false; -        reply_pipe = NULL; +        message_begins = true;      }      return 0; @@ -255,43 +101,18 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)  bool zmq::req_t::xhas_in ()  { -    if (receiving_reply && more) -        return true; - -    if (!receiving_reply || !reply_pipe_active) -        return false; - -    zmq_assert (reply_pipe);     -    if (!reply_pipe->check_read ()) { -        reply_pipe_active = false; +    if (!receiving_reply)          return false; -    } -    return true; +    return xreq_t::xhas_in ();  }  bool zmq::req_t::xhas_out ()  { -    if (!receiving_reply && more) -        return true; -      if (receiving_reply)          return false; -    while (active > 0) { -        if (out_pipes [current]->check_write ()) -            return true;; - -        active--; -        if (current < active) { -            in_pipes.swap (current, active); -            out_pipes.swap (current, active); -        } -        else -            current = 0; -    } - -    return false; +    return xreq_t::xhas_out ();  } diff --git a/src/req.hpp b/src/req.hpp index 5fd5642..0df59b9 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -20,17 +20,12 @@  #ifndef __ZMQ_REQ_HPP_INCLUDED__  #define __ZMQ_REQ_HPP_INCLUDED__ -#include "socket_base.hpp" -#include "yarray.hpp" -#include "pipe.hpp" +#include "xreq.hpp"  namespace zmq  { -    class req_t : -        public socket_base_t, -        public i_reader_events, -        public i_writer_events +    class req_t : public xreq_t      {      public: @@ -38,59 +33,20 @@ namespace zmq          ~req_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, -            const blob_t &peer_identity_); -        void xterm_pipes (); -        bool xhas_pipes ();          int xsend (zmq_msg_t *msg_, int flags_);          int xrecv (zmq_msg_t *msg_, int flags_);          bool xhas_in ();          bool xhas_out (); -        //  i_reader_events interface implementation. -        void activated (reader_t *pipe_); -        void terminated (reader_t *pipe_); - -        //  i_writer_events interface implementation. -        void activated (writer_t *pipe_); -        void terminated (writer_t *pipe_); -      private: -        //  List in outbound and inbound pipes. Note that the two lists are -        //  always in sync. I.e. outpipe with index N communicates with the -        //  same session as inpipe with index N. -        // -        //  TODO: Once we have queue limits in place, list of active outpipes -        //  is to be held (presumably by stacking active outpipes at -        //  the beginning of the array). We don't have to do the same thing for -        //  inpipes, because we know which pipe we want to read the -        //  reply from. -        typedef yarray_t <writer_t> out_pipes_t; -        out_pipes_t out_pipes; -        typedef yarray_t <reader_t> in_pipes_t; -        in_pipes_t in_pipes; - -        //  Number of active pipes. -        size_t active; - -        //  Req_t load-balances the requests - 'current' points to the session -        //  that's processing the request at the moment. -        out_pipes_t::size_type current; -          //  If true, request was already sent and reply wasn't received yet or          //  was raceived partially.          bool receiving_reply; -        //  True, if read can be attempted from the reply pipe. -        bool reply_pipe_active; - -        //  True, if message processed at the moment (either sent or received) -        //  is processed only partially. -        bool more; - -        //  Pipe we are awaiting the reply from. -        reader_t *reply_pipe; +        //  If true, we are starting to send/recv a message. The first part +        //  of the message must be empty message part (backtrace stack bottom). +        bool message_begins;          req_t (const req_t&);          void operator = (const req_t&);  | 
