From 99c5d9283622a0b37ee80f83ff4875c059fc5990 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 27 Aug 2009 10:54:28 +0200 Subject: pipes added --- src/socket_base.cpp | 227 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 221 insertions(+), 6 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fb7bdcf..68fc82b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -27,18 +27,23 @@ #include "err.hpp" #include "zmq_listener.hpp" #include "zmq_connecter.hpp" +#include "msg_content.hpp" #include "io_thread.hpp" #include "session.hpp" #include "config.hpp" #include "owned.hpp" #include "uuid.hpp" +#include "pipe.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), + current (0), + active (0), pending_term_acks (0), + ticks (0), app_thread (parent_), shutting_down (false) -{ +{ } zmq::socket_base_t::~socket_base_t () @@ -65,7 +70,7 @@ zmq::socket_base_t::~socket_base_t () // Process commands till we get all the termination acknowledgements. while (pending_term_acks) - app_thread->process_commands (true); + app_thread->process_commands (true, false); } // Check whether there are no session leaks. @@ -150,8 +155,28 @@ int zmq::socket_base_t::connect (const char *addr_) // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new session_t (io_thread, this, session_name.c_str ()); + session_t *session = new session_t (io_thread, this, session_name.c_str (), + options); zmq_assert (session); + + // Create inbound pipe. + pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm); + zmq_assert (in_pipe); + in_pipe->reader.set_endpoint (this); + session->set_outbound_pipe (&in_pipe->writer); + in_pipes.push_back (std::make_pair (&in_pipe->reader, session)); + in_pipes.back ().first->set_index (active); + in_pipes [active].first->set_index (in_pipes.size () - 1); + std::swap (in_pipes.back (), in_pipes [active]); + active++; + + // Create outbound pipe. + pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); + zmq_assert (out_pipe); + session->set_inbound_pipe (&out_pipe->reader); + out_pipes.push_back (std::make_pair (&out_pipe->writer, session)); + + // Activate the session. send_plug (session); send_own (this, session); @@ -173,17 +198,79 @@ int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { - zmq_assert (false); + // Process pending commands, if any. + app_thread->process_commands (false, true); + + // Try to send the message. + bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); + + if (!(flags_ & ZMQ_NOBLOCK)) { + + // Oops, we couldn't send the message. Wait for the next + // command, process it and try to send the message again. + while (!sent) { + app_thread->process_commands (true, false); + sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); + } + } + else if (!sent) { + errno = EAGAIN; + return -1; + } + + return 0; } int zmq::socket_base_t::flush () { - zmq_assert (false); + for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); + it++) + it->first->flush (); + + return 0; } int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { - zmq_assert (false); + // If the message cannot be fetched immediately, there are two scenarios. + // For non-blocking recv, commands are processed in case there's a message + // already waiting we don't know about. If it's not, return EAGAIN. + // In blocking scenario, commands are processed over and over again until + // we are able to fetch a message. + bool fetched = fetch (msg_); + if (!fetched) { + if (flags_ & ZMQ_NOBLOCK) { + app_thread->process_commands (false, false); + fetched = fetch (msg_); + } + else { + while (!fetched) { + app_thread->process_commands (true, false); + ticks = 0; + fetched = fetch (msg_); + } + } + } + + // Once every inbound_poll_rate messages check for signals and process + // incoming commands. This happens only if we are not polling altogether + // because there are messages available all the time. If poll occurs, + // ticks is set to zero and thus we avoid this code. + // + // Note that 'recv' uses different command throttling algorithm (the one + // described above) from the one used by 'send'. This is because counting + // ticks is more efficient than doing rdtsc all the time. + if (++ticks == inbound_poll_rate) { + app_thread->process_commands (false, false); + ticks = 0; + } + + if (!fetched) { + errno = EAGAIN; + return -1; + } + + return 0; } int zmq::socket_base_t::close () @@ -229,11 +316,35 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_) return it->second; } +void zmq::socket_base_t::revive (reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); + in_pipes [index].first->set_index (active); + in_pipes [active].first->set_index (index); + std::swap (in_pipes [index], in_pipes [active]); + active++; +} + void zmq::socket_base_t::process_own (owned_t *object_) { io_objects.insert (object_); } +void zmq::socket_base_t::process_bind (owned_t *session_, + reader_t *in_pipe_, writer_t *out_pipe_) +{ + zmq_assert (in_pipe_); + in_pipe_->set_endpoint (this); + in_pipes.push_back (std::make_pair (in_pipe_, session_)); + in_pipes.back ().first->set_index (active); + in_pipes [active].first->set_index (in_pipes.size () - 1); + std::swap (in_pipes.back (), in_pipes [active]); + active++; + zmq_assert (out_pipe_); + out_pipes.push_back (std::make_pair (out_pipe_, session_)); +} + void zmq::socket_base_t::process_term_req (owned_t *object_) { // When shutting down we can ignore termination requests from owned @@ -260,3 +371,107 @@ void zmq::socket_base_t::process_term_ack () zmq_assert (pending_term_acks); pending_term_acks--; } + +bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) +{ + int pipes_count = out_pipes.size (); + + // If there are no pipes available, simply drop the message. + if (pipes_count == 0) { + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return true; + } + + // First check whether all pipes are available for writing. + for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); + it++) + if (!it->first->check_write (zmq_msg_size (msg_))) + return false; + + msg_content_t *content = (msg_content_t*) msg_->content; + + // For VSMs the copying is straighforward. + if (content == (msg_content_t*) ZMQ_VSM) { + for (out_pipes_t::iterator it = out_pipes.begin (); + it != out_pipes.end (); it++) { + it->first->write (msg_); + if (flush_) + it->first->flush (); + } + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return true; + } + + // Optimisation for the case when there's only a single pipe + // to send the message to - no refcount adjustment i.e. no atomic + // operations are needed. + if (pipes_count == 1) { + out_pipes.begin ()->first->write (msg_); + if (flush_) + out_pipes.begin ()->first->flush (); + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return true; + } + + // There are at least 2 destinations for the message. That means we have + // to deal with reference counting. First add N-1 references to + // the content (we are holding one reference anyway, that's why -1). + if (msg_->shared) + content->refcnt.add (pipes_count - 1); + else { + content->refcnt.set (pipes_count); + msg_->shared = true; + } + + // Push the message to all destinations. + for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); + it++) { + it->first->write (msg_); + if (flush_) + it->first->flush (); + } + + // Detach the original message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return true; +} + +bool zmq::socket_base_t::fetch (zmq_msg_t *msg_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + + bool fetched = in_pipes [current].first->read (msg_); + + // If there's no message in the pipe, move it to the list of + // non-active pipes. + if (!fetched) { + in_pipes [current].first->set_index (active - 1); + in_pipes [active - 1].first->set_index (current); + std::swap (in_pipes [current], in_pipes [active - 1]); + active--; + } + + current ++; + if (current >= active) + current = 0; + + if (fetched) + return true; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + return false; +} -- cgit v1.2.3