diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/pipe.cpp | 147 | ||||
| -rw-r--r-- | src/pipe.hpp | 29 | ||||
| -rw-r--r-- | src/session.cpp | 161 | ||||
| -rw-r--r-- | src/session.hpp | 27 | ||||
| -rw-r--r-- | src/socket_base.cpp | 4 | 
5 files changed, 208 insertions, 160 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 3d0c0a6..73e5aae 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -62,9 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,      peers_msgs_read (0),      peer (NULL),      sink (NULL), -    terminating (false), -    term_recvd (false), -    delimited (false), +    state (active),      delay (delay_)  {  } @@ -89,7 +87,7 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)  bool zmq::pipe_t::check_read ()  { -    if (unlikely (!in_active)) +    if (unlikely (!in_active || (state != active && state != pending)))          return false;      //  Check if there's an item in the pipe. @@ -104,13 +102,7 @@ bool zmq::pipe_t::check_read ()          msg_t msg;          bool ok = inpipe->read (&msg);          zmq_assert (ok); -        delimited = true; - -        //  If pipe_term was already received but wasn't processed because -        //  of pending messages, we can ack it now. -        if (term_recvd) -            send_pipe_term_ack (peer); - +        delimit ();          return false;      } @@ -119,7 +111,7 @@ bool zmq::pipe_t::check_read ()  bool zmq::pipe_t::read (msg_t *msg_)  { -    if (unlikely (!in_active)) +    if (unlikely (!in_active || (state != active && state != pending)))          return false;      if (!inpipe->read (msg_)) { @@ -129,13 +121,7 @@ bool zmq::pipe_t::read (msg_t *msg_)      //  If delimiter was read, start termination process of the pipe.      if (msg_->is_delimiter ()) { -        delimited = true; - -        //  If pipe_term was already received but wasn't processed because -        //  of pending messages, we can ack it now. -        if (term_recvd) -            send_pipe_term_ack (peer); - +        delimit ();          return false;      } @@ -150,7 +136,7 @@ bool zmq::pipe_t::read (msg_t *msg_)  bool zmq::pipe_t::check_write (msg_t *msg_)  { -    if (unlikely (!out_active)) +    if (unlikely (!out_active || state != active))          return false;      bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm); @@ -188,13 +174,21 @@ void zmq::pipe_t::rollback ()  void zmq::pipe_t::flush ()  { +    //  If terminate() was already called do nothing. +    if (state == terminated && state == double_terminated) +        return; + +    //  The peer does not exist anymore at this point. +    if (state == terminating) +        return; +      if (!outpipe->flush ())          send_activate_read (peer);  }  void zmq::pipe_t::process_activate_read ()  { -    if (!in_active && !terminating) { +    if (!in_active && (state == active || state == pending)) {          in_active = true;          sink->read_activated (this);      } @@ -205,7 +199,7 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)      //  Remember the peers's message sequence number.      peers_msgs_read = msgs_read_; -    if (!out_active && !terminating) { +    if (!out_active && state == active) {          out_active = true;          sink->write_activated (this);      } @@ -213,16 +207,41 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)  void zmq::pipe_t::process_pipe_term ()  { -    term_recvd = true; - -    //  We can proceed with the termination if one of the following is true: -    //  1. User asked this side of pipe to terminate already. -    //  2. Waiting for pending messages in not required. -    //  3. Delimiter was already received. -    if (terminating || !delay || delimited) { -        terminating = true; +    //  This is the simple case of peer-induced termination. If there are no +    //  more pending messages to read, or if the pipe was configured to drop +    //  pending messages, we can move directly to the terminating state. +    //  Otherwise we'll hang up in pending state till all the pending messages +    //  are sent. +    if (state == active) { +        if (!delay) { +            state = terminating; +            send_pipe_term_ack (peer); +        } +        else { +            state = pending; +        } +        return; +    } + +    //  Delimiter happened to arrive before the term command. Now we have the +    //  term command as well, so we can move straight to terminating state. +    if (state == delimited) { +        state = terminating; +        send_pipe_term_ack (peer); +        return; +    } + +    //  This is the case where both ends of the pipe are closed in parallel. +    //  We simply reply to the request by ack and continue waiting for our +    //  own ack. +    if (state == terminated) { +        state = double_terminated;          send_pipe_term_ack (peer); +        return;      } + +    //  pipe_term is invalid in other states. +    zmq_assert (false);  }  void zmq::pipe_t::process_pipe_term_ack () @@ -231,10 +250,16 @@ void zmq::pipe_t::process_pipe_term_ack ()      zmq_assert (sink);      sink->terminated (this); -    //  If the peer haven't asked for the termination itself, we have to -    //  ack the ack, so that it can deallocate properly. -    if (!term_recvd) +    //  In terminating and double_terminated states there's nothing to do. +    //  Simply deallocate the pipe. In terminated state we have to ack the +    //  peer before deallocating this side of the pipe. All the other states +    //  are invalid. +    if (state == terminating) ; +    else if (state == double_terminated); +    else if (state == terminated)          send_pipe_term_ack (peer); +    else +        zmq_assert (false);      //  We'll deallocate the inbound pipe, the peer will deallocate the outbound      //  pipe (which is an inbound pipe from its point of view). @@ -254,10 +279,40 @@ void zmq::pipe_t::process_pipe_term_ack ()  void zmq::pipe_t::terminate ()  { -    //  Prevent double termination. -    if (terminating) +    //  If terminate was already called, we can ignore the duplicit invocation. +    if (state == terminated || state == double_terminated)          return; -    terminating = true; + +    //  If the pipe is in the final phase of async termination, it's going to +    //  closed anyway. No need to do anything special here. +    else if (state == terminating) +        return; + +    //  The simple sync termination case. Ask the peer to terminate and wait +    //  for the ack. +    else if (state == active) { +        send_pipe_term (peer); +        state = terminated; +    } + +    //  There are still pending messages available, but the user calls +    //  'terminate'. We can act as if all the pending messages were read. +    else if (state == pending) { +        send_pipe_term_ack (peer); +        state = terminating; +    } + +    //  We've already got delimiter, but not term command yet. We can ignore +    //  the delimiter and ack synchronously terminate as if we were in +    //  active state.     +    else if (state == delimited) { +        send_pipe_term (peer); +        state = terminated;     +    } + +    //  There are no other states. +    else +        zmq_assert (false);      //  Stop inbound and outbound flow of messages.      in_active = false; @@ -272,9 +327,6 @@ void zmq::pipe_t::terminate ()      msg.init_delimiter ();      outpipe->write (msg, false);      flush (); - -    //  Start the termination handshaking. -    send_pipe_term (peer);  }  bool zmq::pipe_t::is_delimiter (msg_t &msg_) @@ -309,3 +361,20 @@ int zmq::pipe_t::compute_lwm (int hwm_)      return result;  } + +void zmq::pipe_t::delimit () +{ +    if (state == active) { +        state = delimited; +        return; +    } + +    if (state == pending) { +        send_pipe_term_ack (peer); +        state = terminating; +        return; +    } + +    //  Delimiter in any other state is invalid. +    zmq_assert (false); +} diff --git a/src/pipe.hpp b/src/pipe.hpp index f821bdd..3087ab8 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -99,6 +99,9 @@ namespace zmq          void process_pipe_term ();          void process_pipe_term_ack (); +        //  Handler for delimiter read from the pipe. +        void delimit (); +          //  Type of the underlying lock-free pipe.          typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; @@ -142,15 +145,23 @@ namespace zmq          //  Sink to send events to.          i_pipe_events *sink; -        //  True is 'terminate' method was called or termination request -        //  was received from the peer. -        bool terminating; - -        //  True is we've already got pipe_term command from the peer. -        bool term_recvd; - -        //  True if delimiter was already received from the peer. -        bool delimited; +        //  State of the pipe endpoint. Active is common state before any +        //  termination begins. Delimited means that delimiter was read from +        //  pipe before term command was received. Pending means that term +        //  command was already received from the peer but there are still +        //  pending messages to read. Terminating means that all pending +        //  messages were already read and all we are waiting for is ack from +        //  the peer. Terminated means that 'terminate' was explicitly called +        //  by the user. Double_terminated means that user called 'terminate' +        //  and then we've got term command from the peer as well. +        enum { +            active, +            delimited, +            pending, +            terminating, +            terminated, +            double_terminated +        } state;          //  If true, we receive all the pending inbound messages before          //  terminating. If false, we terminate immediately when the peer diff --git a/src/session.cpp b/src/session.cpp index 5ef21c7..bff452e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -31,47 +31,35 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,      io_object_t (io_thread_),      pipe (NULL),      incomplete_in (false), +    terminating (false),      engine (NULL),      socket (socket_),      io_thread (io_thread_), -    pipe_attached (false), -    delimiter_processed (false), -    force_terminate (false), -    has_linger_timer (false), -    state (active) -{     +    has_linger_timer (false) +{  }  zmq::session_t::~session_t ()  {      zmq_assert (!pipe); -    if (engine) -        engine->terminate (); -} - -void zmq::session_t::proceed_with_term () -{ -    if (state == terminating) -        return; - -    zmq_assert (state == pending); -    state = terminating; -      //  If there's still a pending linger timer, remove it.      if (has_linger_timer) {          cancel_timer (linger_timer_id);          has_linger_timer = false;      } -    if (pipe) { -        register_term_acks (1); -        pipe->terminate (); -    } +    //  Close the engine. +    if (engine) +        engine->terminate (); +} -    //  The session has already waited for the linger period. We don't want -    //  the child objects to linger any more thus linger is set to zero. -    own_t::process_term (0); +void zmq::session_t::attach_pipe (pipe_t *pipe_) +{ +    zmq_assert (!pipe); +    zmq_assert (pipe_); +    pipe = pipe_; +    pipe->set_event_sink (this);  }  bool zmq::session_t::read (msg_t *msg_) @@ -127,37 +115,13 @@ void zmq::session_t::clean_pipes ()      }  } -void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) -{ -    zmq_assert (!pipe_attached); -    pipe_attached = true; -     -    if (pipe_) { -        zmq_assert (!pipe); -        pipe = pipe_; -        pipe->set_event_sink (this); -    } - -    //  If we are already terminating, terminate the pipes straight away. -    if (state == terminating) { -        if (pipe) { -            pipe->terminate (); -            register_term_acks (1); -        } -    } -} -  void zmq::session_t::terminated (pipe_t *pipe_)  { +    //  Drop the reference to the deallocated pipe.      zmq_assert (pipe == pipe_); - -    // If we are in process of being closed, but still waiting for all -    // pending messeges being sent, we can terminate here. -    if (state == pending) -        proceed_with_term (); -      pipe = NULL; -    if (state == terminating) + +    if (terminating)          unregister_term_ack ();  } @@ -189,7 +153,7 @@ void zmq::session_t::process_attach (i_engine *engine_,      //  If we are already terminating, we destroy the engine straight away.      //  Note that we don't have to unplug it before deleting as it's not      //  yet plugged to the session. -    if (state == terminating) { +    if (terminating) {          if (engine_)              delete engine_;          return; @@ -209,12 +173,8 @@ void zmq::session_t::process_attach (i_engine *engine_,          return;      } -    //  Check whether the required pipe already exists and create it -    //  if it does not. -    if (!pipe_attached) { -        zmq_assert (!pipe); -        pipe_attached = true; - +    //  Create the pipe if it does not exist yet. +    if (!pipe) {          object_t *parents [2] = {this, socket};          pipe_t *pipes [2] = {NULL, NULL};          int hwms [2] = {options.rcvhwm, options.sndhwm}; @@ -226,6 +186,7 @@ void zmq::session_t::process_attach (i_engine *engine_,          pipes [0]->set_event_sink (this);          //  Remember the local end of the pipe. +        zmq_assert (!pipe);          pipe = pipes [0];          //  Ask socket to plug into the remote end of the pipe. @@ -249,43 +210,45 @@ void zmq::session_t::detach ()      //  Send the event to the derived class.      detached (); -    //  Just in case there's only a delimiter in the inbound pipe. +    //  Just in case there's only a delimiter in the pipe.      if (pipe)          pipe->check_read ();  }  void zmq::session_t::process_term (int linger_)  { -    zmq_assert (state == active); -    state = pending; - -    //  If linger is set to zero, we can terminate the session straight away -    //  not waiting for the pending messages to be sent. -    if (linger_ == 0) { -        proceed_with_term (); -        return; +    //  If termination is already underway, do nothing. +    if (!terminating) { + +        terminating = true; + +		//  If the termination of the pipe happens before the term command is +		//  delivered there's nothing much to do. We can proceed with the +		//  stadard termination immediately. +		if (pipe) { + +			//  We're going to wait till the pipe terminates. +			register_term_acks (1); + +			//  If linger is set to zero, we can ask pipe to terminate without +			//  waiting for pending messages to be read. +			if (linger_ == 0) +				pipe->terminate (); + +			//  If there's finite linger value, set up a timer. +			if (linger_ > 0) { +			   zmq_assert (!has_linger_timer); +			   add_timer (linger_, linger_timer_id); +			   has_linger_timer = true; +			} + +			//  In case there's no engine and there's only delimiter in the pipe it +			//  wouldn't be ever read. Thus we check for it explicitly. +			pipe->check_read (); +		}      } -    //  If there's finite linger value, set up a timer. -    if (linger_ > 0) { -       zmq_assert (!has_linger_timer); -       add_timer (linger_, linger_timer_id); -       has_linger_timer = true; -    } - -    //  If there's no engine and there's only delimiter in the pipe it wouldn't -    //  be ever read. Thus we check for it explicitly. -    if (pipe) -        pipe->check_read (); - -    //  If there's no in pipe, there are no pending messages to send. -    //  We can proceed with the shutdown straight away. Also, if there is -    //  pipe, but the delimiter was already processed, we can terminate -    //  immediately. Alternatively, if the derived session type have -    //  called 'terminate' we'll finish straight away. -    if (delimiter_processed || force_terminate || -          (!options.immediate_connect && !pipe)) -        proceed_with_term (); +    own_t::process_term (0);  }  void zmq::session_t::timer_event (int id_) @@ -294,7 +257,10 @@ void zmq::session_t::timer_event (int id_)      //  there are still pending messages to be sent.      zmq_assert (id_ == linger_timer_id);      has_linger_timer = false; -    proceed_with_term (); + +    //  Ask pipe to terminate even though there may be pending messages in it. +    zmq_assert (pipe); +    pipe->terminate ();  }  bool zmq::session_t::has_engine () @@ -314,6 +280,19 @@ void zmq::session_t::unregister_session (const blob_t &name_)  void zmq::session_t::terminate ()  { -    force_terminate = true; -    own_t::terminate (); +    //  If termination process is already underway, do nothing. +    if (!terminating) { +		terminating = true; +		 +		//  If the pipe was already terminated, there's nothing much to do. +		//  If it wasn't, we'll ask it to terminate. +		if (pipe) { + +			register_term_acks (1); +			pipe->terminate (); +		} +    } + +	own_t::terminate ();  } + diff --git a/src/session.hpp b/src/session.hpp index 4a12d68..f1564d8 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -41,6 +41,9 @@ namespace zmq          session_t (class io_thread_t *io_thread_,              class socket_base_t *socket_, const options_t &options_); +        //  To be used once only, when creating the session. +        void attach_pipe (class pipe_t *pipe_); +          //  i_inout interface implementation. Note that detach method is not          //  implemented by generic session. Different session types may handle          //  engine disconnection in different ways. @@ -49,8 +52,6 @@ namespace zmq          void flush ();          void detach (); -        void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); -          //  i_pipe_events interface implementation.          void read_activated (class pipe_t *pipe_);          void write_activated (class pipe_t *pipe_); @@ -59,7 +60,7 @@ namespace zmq      protected:          //  This function allows to shut down the session even though -        //  there are pending messages in the inbound pipe. +        //  there are messages pending.          void terminate ();          //  Two events for the derived session type. Attached is triggered @@ -104,6 +105,10 @@ namespace zmq          //  is still in the in pipe.          bool incomplete_in; +        //  If true the termination process is already underway, ie. term ack +        //  for the pipe was already registered etc. +        bool terminating; +          //  The protocol I/O engine connected to the session.          struct i_engine *engine; @@ -114,28 +119,12 @@ namespace zmq          //  the engines into the same thread.          class io_thread_t *io_thread; -        //  If true, pipe was already attached to this session. -        bool pipe_attached; - -        //  If true, delimiter was already read from the inbound pipe. -        bool delimiter_processed; - -        //  If true, we should terminate the session even though there are -        //  pending messages in the inbound pipe. -        bool force_terminate; -          //  ID of the linger timer          enum {linger_timer_id = 0x20};          //  True is linger timer is running.          bool has_linger_timer; -        enum { -            active, -            pending, -            terminating -        } state; -          session_t (const session_t&);          const session_t &operator = (const session_t&);      }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fae55f2..1682c05 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -450,8 +450,8 @@ int zmq::socket_base_t::connect (const char *addr_)          //  Attach local end of the pipe to the socket object.          attach_pipe (pipes [0], blob_t ()); -        //  Attach remote end of the pipe to the session object. -        session->attach_pipe (pipes [1], blob_t ()); +        //  Attach remote end of the pipe to the session object later on. +        session->attach_pipe (pipes [1]);      }      //  Activate the session. Make it a child of this socket.  | 
