diff options
| -rw-r--r-- | src/pipe.cpp | 3 | ||||
| -rw-r--r-- | src/session.cpp | 110 | ||||
| -rw-r--r-- | src/session.hpp | 10 | ||||
| -rw-r--r-- | src/tcp_socket.cpp | 2 | 
4 files changed, 57 insertions, 68 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 73e5aae..48fc3e5 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -216,11 +216,12 @@ void zmq::pipe_t::process_pipe_term ()          if (!delay) {              state = terminating;              send_pipe_term_ack (peer); +            return;          }          else {              state = pending; +            return;          } -        return;      }      //  Delimiter happened to arrive before the term command. Now we have the diff --git a/src/session.cpp b/src/session.cpp index bff452e..5601402 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -31,7 +31,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,      io_object_t (io_thread_),      pipe (NULL),      incomplete_in (false), -    terminating (false), +    pending (false),      engine (NULL),      socket (socket_),      io_thread (io_thread_), @@ -121,8 +121,11 @@ void zmq::session_t::terminated (pipe_t *pipe_)      zmq_assert (pipe == pipe_);      pipe = NULL; -    if (terminating) -        unregister_term_ack (); +    //  If we are waiting for pending messages to be sent, at this point +    //  we are sure that there will be no more messages and we can proceed +    //  with termination safely. +    if (pending) +        proceed_with_term ();  }  void zmq::session_t::read_activated (pipe_t *pipe_) @@ -150,15 +153,6 @@ void zmq::session_t::process_plug ()  void zmq::session_t::process_attach (i_engine *engine_,      const blob_t &peer_identity_)  { -    //  If we are already terminating, we destroy the engine straight away. -    //  Note that we don't have to unplug it before deleting as it's not -    //  yet plugged to the session. -    if (terminating) { -        if (engine_) -            delete engine_; -        return; -    } -      //  If some other object (e.g. init) notifies us that the connection failed      //  without creating an engine we need to start the reconnection process.      if (!engine_) { @@ -217,37 +211,52 @@ void zmq::session_t::detach ()  void zmq::session_t::process_term (int linger_)  { -    //  If termination is already underway, do nothing. -    if (!terminating) { - -        terminating = true; - -		//  If the termination of the pipe happens before the term command is -		//  delivered there's nothing much to do. We can proceed with the -		//  stadard termination immediately. -		if (pipe) { - -			//  We're going to wait till the pipe terminates. -			register_term_acks (1); - -			//  If linger is set to zero, we can ask pipe to terminate without -			//  waiting for pending messages to be read. -			if (linger_ == 0) -				pipe->terminate (); - -			//  If there's finite linger value, set up a timer. -			if (linger_ > 0) { -			   zmq_assert (!has_linger_timer); -			   add_timer (linger_, linger_timer_id); -			   has_linger_timer = true; -			} - -			//  In case there's no engine and there's only delimiter in the pipe it -			//  wouldn't be ever read. Thus we check for it explicitly. -			pipe->check_read (); -		} +    zmq_assert (!pending); + +    //  If the termination of the pipe happens before the term command is +    //  delivered there's nothing much to do. We can proceed with the +    //  stadard termination immediately. +    if (!pipe) { +        proceed_with_term (); +        return; +    } + +    //  If linger is set to zero, we can ask pipe to terminate without +    //  waiting for pending messages to be read. +    if (linger_ == 0) { +        proceed_with_term (); +        return; +    } + +    pending = true; + +    //  If there's finite linger value, delay the termination. +    //  If linger is infinite (negative) we don't even have to set +    //  the timer. +    if (linger_ > 0) { +        zmq_assert (!has_linger_timer); +        add_timer (linger_, linger_timer_id); +        has_linger_timer = true;      } +    //  In case there's no engine and there's only delimiter in the +    //  pipe it wouldn't be ever read. Thus we check for it explicitly. +    pipe->check_read (); +} + +void zmq::session_t::proceed_with_term () +{ +    //  The pending phase have just ended. +    pending = false; + +    //  If there's pipe attached to the session, we have to wait till it +    //  terminates. +    if (pipe) { +        register_term_acks (1); +        pipe->terminate (); +    } + +    //  Continue with standard termination.      own_t::process_term (0);  } @@ -260,7 +269,7 @@ void zmq::session_t::timer_event (int id_)      //  Ask pipe to terminate even though there may be pending messages in it.      zmq_assert (pipe); -    pipe->terminate (); +    proceed_with_term ();  }  bool zmq::session_t::has_engine () @@ -278,21 +287,4 @@ void zmq::session_t::unregister_session (const blob_t &name_)      socket->unregister_session (name_);  } -void zmq::session_t::terminate () -{ -    //  If termination process is already underway, do nothing. -    if (!terminating) { -		terminating = true; -		 -		//  If the pipe was already terminated, there's nothing much to do. -		//  If it wasn't, we'll ask it to terminate. -		if (pipe) { - -			register_term_acks (1); -			pipe->terminate (); -		} -    } - -	own_t::terminate (); -} diff --git a/src/session.hpp b/src/session.hpp index f1564d8..8bca735 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -59,10 +59,6 @@ namespace zmq      protected: -        //  This function allows to shut down the session even though -        //  there are messages pending. -        void terminate (); -          //  Two events for the derived session type. Attached is triggered          //  when session is attached to a peer. The function can reject the new          //  peer by returning false. Detached is triggered at the beginning of @@ -105,9 +101,9 @@ namespace zmq          //  is still in the in pipe.          bool incomplete_in; -        //  If true the termination process is already underway, ie. term ack -        //  for the pipe was already registered etc. -        bool terminating; +        //  True if termination have been suspended to push the pending +        //  messages to the network. +        bool pending;          //  The protocol I/O engine connected to the session.          struct i_engine *engine; diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index 2257e4f..3c9b1fd 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -213,7 +213,7 @@ int zmq::tcp_socket_t::read (void *data_, size_t size_)      errno_assert (nbytes != -1); -    //  Orderly shutdown by the other peer. +    //  Orderly shutdown by the peer.      if (nbytes == 0)          return -1;  | 
