diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/fq.cpp | 4 | ||||
-rw-r--r-- | src/fq.hpp | 1 | ||||
-rw-r--r-- | src/pair.cpp | 12 | ||||
-rw-r--r-- | src/pair.hpp | 1 | ||||
-rw-r--r-- | src/pipe.cpp | 7 | ||||
-rw-r--r-- | src/pipe.hpp | 1 | ||||
-rw-r--r-- | src/session.cpp | 111 | ||||
-rw-r--r-- | src/session.hpp | 25 | ||||
-rw-r--r-- | src/xrep.cpp | 4 | ||||
-rw-r--r-- | src/xrep.hpp | 1 |
10 files changed, 99 insertions, 68 deletions
@@ -73,6 +73,10 @@ void zmq::fq_t::terminated (reader_t *pipe_) sink->unregister_term_ack (); } +void zmq::fq_t::delimited (reader_t *pipe_) +{ +} + void zmq::fq_t::terminate () { zmq_assert (!terminating); @@ -45,6 +45,7 @@ namespace zmq // i_reader_events implementation. void activated (reader_t *pipe_); void terminated (reader_t *pipe_); + void delimited (reader_t *pipe_); private: diff --git a/src/pair.cpp b/src/pair.cpp index 89f949d..492ec55 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -42,8 +42,8 @@ zmq::pair_t::~pair_t () zmq_assert (!outpipe); } -void zmq::pair_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, + const blob_t &peer_identity_) { zmq_assert (!inpipe && !outpipe); @@ -62,7 +62,7 @@ void zmq::pair_t::xattach_pipes (class reader_t *inpipe_, } } -void zmq::pair_t::terminated (class reader_t *pipe_) +void zmq::pair_t::terminated (reader_t *pipe_) { zmq_assert (pipe_ == inpipe); inpipe = NULL; @@ -72,7 +72,7 @@ void zmq::pair_t::terminated (class reader_t *pipe_) unregister_term_ack (); } -void zmq::pair_t::terminated (class writer_t *pipe_) +void zmq::pair_t::terminated (writer_t *pipe_) { zmq_assert (pipe_ == outpipe); outpipe = NULL; @@ -82,6 +82,10 @@ void zmq::pair_t::terminated (class writer_t *pipe_) unregister_term_ack (); } +void zmq::pair_t::delimited (reader_t *pipe_) +{ +} + void zmq::pair_t::process_term () { terminating = true; diff --git a/src/pair.hpp b/src/pair.hpp index 65b474e..030fb97 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -47,6 +47,7 @@ namespace zmq // i_reader_events interface implementation. void activated (class reader_t *pipe_); void terminated (class reader_t *pipe_); + void delimited (class reader_t *pipe_); // i_writer_events interface implementation. void activated (class writer_t *pipe_); diff --git a/src/pipe.cpp b/src/pipe.cpp index 4551660..65e9b0b 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -89,6 +89,11 @@ bool zmq::reader_t::check_read () // If the next item in the pipe is message delimiter, // initiate its termination. if (pipe->probe (is_delimiter)) { + zmq_msg_t msg; + bool ok = pipe->read (&msg); + zmq_assert (ok); + if (sink) + sink->delimited (this); terminate (); return false; } @@ -109,6 +114,8 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) // If delimiter was read, start termination process of the pipe. unsigned char *offset = 0; if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { + if (sink) + sink->delimited (this); terminate (); return false; } diff --git a/src/pipe.hpp b/src/pipe.hpp index 1659e2e..bfe6603 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -52,6 +52,7 @@ namespace zmq virtual void terminated (class reader_t *pipe_) = 0; virtual void activated (class reader_t *pipe_) = 0; + virtual void delimited (class reader_t *pipe_) = 0; }; class reader_t : public object_t, public array_item_t diff --git a/src/session.cpp b/src/session.cpp index 4c448af..f39cf05 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -35,9 +35,9 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, engine (NULL), socket (socket_), io_thread (io_thread_), - attach_processed (false), - term_processed (false), - finalised (false) + delimiter_processed (false), + force_terminate (false), + state (active) { } @@ -50,12 +50,24 @@ zmq::session_t::~session_t () engine->terminate (); } -void zmq::session_t::terminate () +void zmq::session_t::proceed_with_term () { - if (in_pipe) + if (state == terminating) + return; + + zmq_assert (state == pending); + state = terminating; + + if (in_pipe) { + register_term_acks (1); in_pipe->terminate (); - if (out_pipe) + } + if (out_pipe) { + register_term_acks (1); out_pipe->terminate (); + } + + own_t::process_term (); } bool zmq::session_t::read (::zmq_msg_t *msg_) @@ -125,46 +137,44 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, } // If we are already terminating, terminate the pipes straight away. - if (finalised) { + if (state == terminating) { if (in_pipe) { - register_term_acks (1); in_pipe->terminate (); + register_term_acks (1); } if (out_pipe) { - register_term_acks (1); out_pipe->terminate (); + register_term_acks (1); } - return; } +} - attach_processed = true; - finalise (); +void zmq::session_t::delimited (reader_t *pipe_) +{ + zmq_assert (in_pipe == pipe_); + zmq_assert (!delimiter_processed); + delimiter_processed = true; + + // If we are in process of being closed, but still waiting for all + // pending messeges being sent, we can terminate here. + if (state == pending) + proceed_with_term (); } void zmq::session_t::terminated (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); in_pipe = NULL; - - if (finalised) { + if (state == terminating) unregister_term_ack (); - return; - } - - finalise (); } void zmq::session_t::terminated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); out_pipe = NULL; - - if (finalised) { + if (state == terminating) unregister_term_ack (); - return; - } - - finalise (); } void zmq::session_t::activated (reader_t *pipe_) @@ -186,27 +196,6 @@ void zmq::session_t::process_plug () { } -void zmq::session_t::finalise () -{ - // There may be delimiter waiting in the inbound pipe, never to be read - // because the connection cannot be established. In order to terminate - // decently in such case, do check_read which will in turn start the pipe - // termination process if there's delimiter in it. - if (in_pipe) - in_pipe->check_read (); - - // If all conditions are met, proceed with termination: - // 1. Owner object already asked us to terminate. - // 2. The pipes were already attached to the session. - // 3. Both pipes have already terminated. Note that inbound pipe - // is terminated after delimiter is read, i.e. all messages - // were already sent to the wire. - if (term_processed && attach_processed && !in_pipe && !out_pipe) { - finalised = true; - own_t::process_term (); - } -} - void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { @@ -219,7 +208,7 @@ void zmq::session_t::process_attach (i_engine *engine_, } // If we are already terminating, we destroy the engine straight away. - if (finalised) { + if (state == terminating) { delete engine; return; } @@ -229,18 +218,19 @@ void zmq::session_t::process_attach (i_engine *engine_, reader_t *socket_reader = NULL; writer_t *socket_writer = NULL; + // Create the pipes, if required. if (options.requires_in && !out_pipe) { create_pipe (socket, this, options.hwm, options.swap, &socket_reader, &out_pipe); out_pipe->set_event_sink (this); } - if (options.requires_out && !in_pipe) { create_pipe (this, socket, options.hwm, options.swap, &in_pipe, &socket_writer); in_pipe->set_event_sink (this); } + // Bind the pipes to the socket object. if (socket_reader || socket_writer) send_bind (socket, socket_reader, socket_writer, peer_identity_); @@ -250,8 +240,6 @@ void zmq::session_t::process_attach (i_engine *engine_, engine = engine_; engine->plug (io_thread, this); - attach_processed = true; - // Trigger the notfication about the attachment. attached (peer_identity_); } @@ -266,11 +254,21 @@ void zmq::session_t::detach () void zmq::session_t::process_term () { - // Here we are pugging into the own_t's termination mechanism. - // The goal is to postpone the termination till all the pending messages - // are sent to the peer. - term_processed = true; - finalise (); + zmq_assert (state == active); + state = pending; + + // If there's no engine and there's only delimiter in the pipe it wouldn't + // be ever read. Thus we check for it explicitly. + if (in_pipe) + in_pipe->check_read (); + + // If there's no in pipe there are no pending messages to send. + // We can proceed with the shutdown straight away. Also, if there is + // inbound pipe, but the delimiter was already processed, we can + // terminate immediately. Alternatively, if the derived session type have + // called 'terminate' we'll finish straight away. + if (!options.requires_out || delimiter_processed || force_terminate) + proceed_with_term (); } bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) @@ -291,3 +289,8 @@ void zmq::session_t::detached () { } +void zmq::session_t::terminate () +{ + force_terminate = true; + own_t::terminate (); +} diff --git a/src/session.hpp b/src/session.hpp index 6e6d2e6..d4b6ad9 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -54,6 +54,7 @@ namespace zmq // i_reader_events interface implementation. void activated (class reader_t *pipe_); void terminated (class reader_t *pipe_); + void delimited (class reader_t *pipe_); // i_writer_events interface implementation. void activated (class writer_t *pipe_); @@ -61,8 +62,8 @@ namespace zmq protected: - // Forcefully close this session (without sending - // outbound messages to the wire). + // This function allows to shut down the session even though + // there are pending messages in the inbound pipe. void terminate (); // Two events for the derived session type. Attached is triggered @@ -93,9 +94,8 @@ namespace zmq const blob_t &peer_identity_); void process_term (); - // Check whether object is ready for termination. If so proceed - // with closing child objects. - void finalise (); + // Call this function to move on with the delayed process_term. + void proceed_with_term (); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; @@ -117,13 +117,18 @@ namespace zmq // the engines into the same thread. class io_thread_t *io_thread; - // True if pipes were already attached. - bool attach_processed; + // If true, delimiter was already read from the inbound pipe. + bool delimiter_processed; - // True if term command was already processed. - bool term_processed; + // If true, we should terminate the session even though there are + // pending messages in the inbound pipe. + bool force_terminate; - bool finalised; + enum { + active, + pending, + terminating + } state; session_t (const session_t&); void operator = (const session_t&); diff --git a/src/xrep.cpp b/src/xrep.cpp index d45b8aa..9cbd9cb 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -135,6 +135,10 @@ void zmq::xrep_t::terminated (writer_t *pipe_) zmq_assert (false); } +void zmq::xrep_t::delimited (reader_t *pipe_) +{ +} + void zmq::xrep_t::activated (reader_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); diff --git a/src/xrep.hpp b/src/xrep.hpp index b16682d..4831aaf 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -57,6 +57,7 @@ namespace zmq // i_reader_events interface implementation. void activated (reader_t *pipe_); void terminated (reader_t *pipe_); + void delimited (reader_t *pipe_); // i_writer_events interface implementation. void activated (writer_t *pipe_); |