diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-27 10:54:28 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-27 10:54:28 +0200 |
commit | 99c5d9283622a0b37ee80f83ff4875c059fc5990 (patch) | |
tree | 3460ec503898d2184dc807e47eea679d24d96d5c /src | |
parent | aacdb7a454686bfac93164d0e67e785658d48a3c (diff) |
pipes added
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/app_thread.cpp | 26 | ||||
-rw-r--r-- | src/app_thread.hpp | 4 | ||||
-rw-r--r-- | src/command.hpp | 11 | ||||
-rw-r--r-- | src/config.hpp | 8 | ||||
-rw-r--r-- | src/i_endpoint.hpp | 33 | ||||
-rw-r--r-- | src/object.cpp | 30 | ||||
-rw-r--r-- | src/object.hpp | 9 | ||||
-rw-r--r-- | src/pipe.cpp | 112 | ||||
-rw-r--r-- | src/pipe.hpp | 102 | ||||
-rw-r--r-- | src/session.cpp | 56 | ||||
-rw-r--r-- | src/session.hpp | 24 | ||||
-rw-r--r-- | src/socket_base.cpp | 227 | ||||
-rw-r--r-- | src/socket_base.hpp | 42 | ||||
-rw-r--r-- | src/ypipe.hpp | 22 | ||||
-rw-r--r-- | src/yqueue.hpp | 2 | ||||
-rw-r--r-- | src/zmq_encoder.cpp | 4 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 7 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 4 | ||||
-rw-r--r-- | src/zmq_listener_init.cpp | 3 |
20 files changed, 679 insertions, 49 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 396e3a3..b6a4540 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -19,6 +19,7 @@ libzmq_la_SOURCES = \ io_object.hpp \ io_thread.hpp \ ip.hpp \ + i_endpoint.hpp \ i_poller.hpp \ i_poll_events.hpp \ i_signaler.hpp \ @@ -66,6 +67,7 @@ libzmq_la_SOURCES = \ object.cpp \ options.cpp \ owned.cpp \ + pipe.cpp \ poll.cpp \ select.cpp \ session.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 74ba357..db73ec1 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -77,7 +77,7 @@ bool zmq::app_thread_t::make_current () return true; } -void zmq::app_thread_t::process_commands (bool block_) +void zmq::app_thread_t::process_commands (bool block_, bool throttle_) { ypollset_t::signals_t signals; if (block_) @@ -91,24 +91,26 @@ void zmq::app_thread_t::process_commands (bool block_) // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU // etc. The optimisation makes sense only on platforms where getting // a timestamp is a very cheap operation (tens of nanoseconds). + if (throttle_) { - // Get timestamp counter. + // Get timestamp counter. #if defined __GNUC__ - uint32_t low; - uint32_t high; - __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); - uint64_t current_time = (uint64_t) high << 32 | low; + uint32_t low; + uint32_t high; + __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); + uint64_t current_time = (uint64_t) high << 32 | low; #elif defined _MSC_VER - uint64_t current_time = __rdtsc (); + uint64_t current_time = __rdtsc (); #else #error #endif - // Check whether certain time have elapsed since last command - // processing. - if (current_time - last_processing_time <= max_command_delay) - return; - last_processing_time = current_time; + // Check whether certain time have elapsed since last command + // processing. + if (current_time - last_processing_time <= max_command_delay) + return; + last_processing_time = current_time; + } #endif // Check whether there are any commands pending for this thread. diff --git a/src/app_thread.hpp b/src/app_thread.hpp index e7bbf70..e45b1b2 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -53,7 +53,9 @@ namespace zmq // Processes commands sent to this thread (if any). If 'block' is // set to true, returns only after at least one command was processed. - void process_commands (bool block_); + // If throttle argument is true, commands are processed at most once + // in a predefined time period. + void process_commands (bool block_, bool throttle_); // Create a socket of a specified type. class socket_base_t *create_socket (int type_); diff --git a/src/command.hpp b/src/command.hpp index 41c7d6c..d3bad79 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -39,6 +39,7 @@ namespace zmq own, attach, bind, + revive, term_req, term, term_ack @@ -65,10 +66,18 @@ namespace zmq class zmq_engine_t *engine; } attach; - // Sent between objects to establish pipe(s) between them. + // Sent from session to socket to establish pipe(s) between them. struct { + class owned_t *session; + class reader_t *in_pipe; + class writer_t *out_pipe; } bind; + // Sent by pipe writer to inform dormant pipe reader that there + // are messages in the pipe. + struct { + } revive; + // Sent by I/O object ot the socket to request the shutdown of // the I/O object. struct { diff --git a/src/config.hpp b/src/config.hpp index 88b93d7..17e67b9 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -38,6 +38,14 @@ namespace zmq // footprint of dispatcher. command_pipe_granularity = 4, + // Determines how often does socket poll for new commands when it + // still has unprocessed messages to handle. Thus, if it is set to 100, + // socket will process 100 inbound messages before doing the poll. + // If there are no unprocessed messages available, poll is done + // immediately. Decreasing the value trades overall latency for more + // real-time behaviour (less latency peaks). + inbound_poll_rate = 100, + // Maximal batching size for engines with receiving functionality. // So, if there are 10 messages that fit into the batch size, all of // them may be read by a single 'recv' system call, thus avoiding diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp new file mode 100644 index 0000000..bb7409e --- /dev/null +++ b/src/i_endpoint.hpp @@ -0,0 +1,33 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__ +#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__ + +namespace zmq +{ + + struct i_endpoint + { + virtual void revive (class reader_t *pipe_) = 0; + }; + +} + +#endif diff --git a/src/object.cpp b/src/object.cpp index 0a25750..4d54ebf 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -20,6 +20,7 @@ #include "object.hpp" #include "dispatcher.hpp" #include "err.hpp" +#include "pipe.hpp" #include "io_thread.hpp" #include "simple_semaphore.hpp" #include "owned.hpp" @@ -57,6 +58,10 @@ void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { + case command_t::revive: + process_revive (); + break; + case command_t::stop: process_stop (); break; @@ -74,7 +79,8 @@ void zmq::object_t::process_command (command_t &cmd_) return; case command_t::bind: - process_bind (); + process_bind (cmd_.args.bind.session, + cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); return; case command_t::term_req: @@ -140,11 +146,23 @@ void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_) send_command (cmd); } -void zmq::object_t::send_bind (object_t *destination_) +void zmq::object_t::send_bind (object_t *destination_, owned_t *session_, + reader_t *in_pipe_, writer_t *out_pipe_) { command_t cmd; cmd.destination = destination_; cmd.type = command_t::bind; + cmd.args.bind.session = session_; + cmd.args.bind.in_pipe = in_pipe_; + cmd.args.bind.out_pipe = out_pipe_; + send_command (cmd); +} + +void zmq::object_t::send_revive (object_t *destination_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::revive; send_command (cmd); } @@ -194,7 +212,13 @@ void zmq::object_t::process_attach (zmq_engine_t *engine_) zmq_assert (false); } -void zmq::object_t::process_bind () +void zmq::object_t::process_bind (owned_t *session_, + reader_t *in_pipe_, writer_t *out_pipe_) +{ + zmq_assert (false); +} + +void zmq::object_t::process_revive () { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 31c8c40..0dbac24 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -24,7 +24,6 @@ namespace zmq { - // Base class for all objects that participate in inter-thread // communication. @@ -58,7 +57,9 @@ namespace zmq class owned_t *object_); void send_attach (class session_t *destination_, class zmq_engine_t *engine_); - void send_bind (object_t *destination_); + void send_bind (object_t *destination_, class owned_t *session_, + class reader_t *in_pipe_, class writer_t *out_pipe_); + void send_revive (class object_t *destination_); void send_term_req (class socket_base_t *destination_, class owned_t *object_); void send_term (class owned_t *destination_); @@ -70,7 +71,9 @@ namespace zmq virtual void process_plug (); virtual void process_own (class owned_t *object_); virtual void process_attach (class zmq_engine_t *engine_); - virtual void process_bind (); + virtual void process_bind (class owned_t *session_, + class reader_t *in_pipe_, class writer_t *out_pipe_); + virtual void process_revive (); virtual void process_term_req (class owned_t *object_); virtual void process_term (); virtual void process_term_ack (); diff --git a/src/pipe.cpp b/src/pipe.cpp new file mode 100644 index 0000000..5016631 --- /dev/null +++ b/src/pipe.cpp @@ -0,0 +1,112 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <pthread.h> + +#include "pipe.hpp" + +zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_) : + object_t (parent_), + pipe (pipe_), + peer (&pipe_->writer), + hwm (hwm_), + lwm (lwm_), + index (-1), + endpoint (NULL) +{ +} + +zmq::reader_t::~reader_t () +{ +} + +bool zmq::reader_t::read (zmq_msg_t *msg_) +{ + return pipe->read (msg_); + + // TODO: Adjust the size of the pipe. +} + +void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) +{ + endpoint = endpoint_; +} + +void zmq::reader_t::set_index (int index_) +{ + index = index_; +} + +int zmq::reader_t::get_index () +{ + return index; +} + +void zmq::reader_t::process_revive () +{ + endpoint->revive (this); +} + +zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_) : + object_t (parent_), + pipe (pipe_), + peer (&pipe_->reader), + hwm (hwm_), + lwm (lwm_) +{ +} + +zmq::writer_t::~writer_t () +{ +} + +bool zmq::writer_t::check_write (uint64_t size_) +{ + // TODO: Check whether hwm is exceeded. + + return true; +} + +bool zmq::writer_t::write (struct zmq_msg_t *msg_) +{ + pipe->write (*msg_); + return true; + + // TODO: Adjust size of the pipe. +} + +void zmq::writer_t::flush () +{ + if (!pipe->flush ()) + send_revive (peer); +} + +zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, uint64_t lwm_) : + reader (reader_parent_, this, hwm_, lwm_), + writer (writer_parent_, this, hwm_, lwm_) +{ +} + +zmq::pipe_t::~pipe_t () +{ +} + diff --git a/src/pipe.hpp b/src/pipe.hpp index 28e4b4d..d48fc47 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -22,15 +22,117 @@ #include "../include/zmq.h" +#include "stdint.hpp" +#include "i_endpoint.hpp" #include "ypipe.hpp" #include "config.hpp" +#include "object.hpp" namespace zmq { + class reader_t : public object_t + { + public: + + reader_t (class object_t *parent_, class pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_); + ~reader_t (); + + // Reads a message to the underlying pipe. + bool read (struct zmq_msg_t *msg_); + + // Mnaipulation of index of the pipe. + void set_endpoint (i_endpoint *endpoint_); + void set_index (int index_); + int get_index (); + + private: + + // Command handlers. + void process_revive (); + + // The underlying pipe. + class pipe_t *pipe; + + // Pipe writer associated with the other side of the pipe. + class object_t *peer; + + // High and low watermarks for in-memory storage (in bytes). + uint64_t hwm; + uint64_t lwm; + + // Positions of head and tail of the pipe (in bytes). + uint64_t head; + uint64_t tail; + uint64_t last_sent_head; + + // Index of the pipe in the socket's list of inbound pipes. + int index; + + // Endpoint (either session or socket) the pipe is attached to. + i_endpoint *endpoint; + + reader_t (const reader_t&); + void operator = (const reader_t&); + }; + + class writer_t : public object_t + { + public: + + writer_t (class object_t *parent_, class pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_); + ~writer_t (); + + // Checks whether message with specified size can be written to the + // pipe. If writing the message would cause high watermark to be + // exceeded, the function returns false. + bool check_write (uint64_t size_); + + // Writes a message to the underlying pipe. Returns false if the + // message cannot be written because high watermark was reached. + bool write (struct zmq_msg_t *msg_); + + // Flush the messages downsteam. + void flush (); + + private: + + // The underlying pipe. + class pipe_t *pipe; + + // Pipe reader associated with the other side of the pipe. + class object_t *peer; + + // High and low watermarks for in-memory storage (in bytes). + uint64_t hwm; + uint64_t lwm; + + // Positions of head and tail of the pipe (in bytes). + uint64_t head; + uint64_t tail; + + writer_t (const writer_t&); + void operator = (const writer_t&); + }; + // Message pipe. class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity> { + public: + + pipe_t (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, uint64_t lwm_); + ~pipe_t (); + + reader_t reader; + writer_t writer; + + private: + + pipe_t (const pipe_t&); + void operator = (const pipe_t&); }; } diff --git a/src/session.cpp b/src/session.cpp index fc1f858..115fb85 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -20,12 +20,17 @@ #include "session.hpp" #include "zmq_engine.hpp" #include "err.hpp" +#include "pipe.hpp" zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const char *name_) : + const char *name_, const options_t &options_) : owned_t (parent_, owner_), + in_pipe (NULL), + active (false), + out_pipe (NULL), engine (NULL), - name (name_) + name (name_), + options (options_) { } @@ -33,18 +38,48 @@ zmq::session_t::~session_t () { } +void zmq::session_t::set_inbound_pipe (reader_t *pipe_) +{ + zmq_assert (!in_pipe); + in_pipe = pipe_; + active = true; + in_pipe->set_endpoint (this); +} +void zmq::session_t::set_outbound_pipe (writer_t *pipe_) +{ + zmq_assert (!out_pipe); + out_pipe = pipe_; +} + + bool zmq::session_t::read (::zmq_msg_t *msg_) { - return false; + if (!active) + return false; + + bool fetched = in_pipe->read (msg_); + if (!fetched) + active = false; + + return fetched; } bool zmq::session_t::write (::zmq_msg_t *msg_) { - return false; + return out_pipe->write (msg_); } void zmq::session_t::flush () { + out_pipe->flush (); +} + +void zmq::session_t::revive (reader_t *pipe_) +{ + zmq_assert (in_pipe == pipe_); + active = true; + if (engine) + engine->revive (); } void zmq::session_t::process_plug () @@ -56,6 +91,19 @@ void zmq::session_t::process_plug () // We should syslog it and drop the session. TODO zmq_assert (ok); + // If session is created by 'connect' function, it has the pipes set + // already. Otherwise, it's being created by the listener and the pipes + // are yet to be created. + if (!in_pipe && !out_pipe) { + pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm); + zmq_assert (inbound); + in_pipe = &inbound->reader; + pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm); + zmq_assert (outbound); + out_pipe = &outbound->writer; + send_bind (owner, this, &outbound->reader, &inbound->writer); + } + owned_t::process_plug (); } diff --git a/src/session.hpp b/src/session.hpp index 6d6bcf7..b79fb4b 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -23,17 +23,22 @@ #include <string> #include "i_inout.hpp" +#include "i_endpoint.hpp" #include "owned.hpp" #include "options.hpp" namespace zmq { - class session_t : public owned_t, public i_inout + class session_t : public owned_t, public i_inout, public i_endpoint { public: - session_t (object_t *parent_, socket_base_t *owner_, const char *name_); + session_t (object_t *parent_, socket_base_t *owner_, const char *name_, + const options_t &options_); + + void set_inbound_pipe (class reader_t *pipe_); + void set_outbound_pipe (class writer_t *pipe_); private: @@ -44,17 +49,32 @@ namespace zmq bool write (::zmq_msg_t *msg_); void flush (); + // i_endpoint interface implementation. + void revive (class reader_t *pipe_); + // Handlers for incoming commands. void process_plug (); void process_unplug (); void process_attach (class zmq_engine_t *engine_); + // Inbound pipe, i.e. one the session is getting messages from. + class reader_t *in_pipe; + + // If true, in_pipe is active. Otherwise there are no messages to get. + bool active; + + // Outbound pipe, i.e. one the socket is sending messages to. + class writer_t *out_pipe; + class zmq_engine_t *engine; // The name of the session. One that is used to register it with // socket-level repository of sessions. std::string name; + // Inherited socket options. + options_t options; + session_t (const session_t&); void operator = (const session_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fb7bdcf..68fc82b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -27,18 +27,23 @@ #include "err.hpp" #include "zmq_listener.hpp" #include "zmq_connecter.hpp" +#include "msg_content.hpp" #include "io_thread.hpp" #include "session.hpp" #include "config.hpp" #include "owned.hpp" #include "uuid.hpp" +#include "pipe.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), + current (0), + active (0), pending_term_acks (0), + ticks (0), app_thread (parent_), shutting_down (false) -{ +{ } zmq::socket_base_t::~socket_base_t () @@ -65,7 +70,7 @@ zmq::socket_base_t::~socket_base_t () // Process commands till we get all the termination acknowledgements. while (pending_term_acks) - app_thread->process_commands (true); + app_thread->process_commands (true, false); } // Check whether there are no session leaks. @@ -150,8 +155,28 @@ int zmq::socket_base_t::connect (const char *addr_) // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new session_t (io_thread, this, session_name.c_str ()); + session_t *session = new session_t (io_thread, this, session_name.c_str (), + options); zmq_assert (session); + + // Create inbound pipe. + pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm); + zmq_assert (in_pipe); + in_pipe->reader.set_endpoint (this); + session->set_outbound_pipe (&in_pipe->writer); + in_pipes.push_back (std::make_pair (&in_pipe->reader, session)); + in_pipes.back ().first->set_index (active); + in_pipes [active].first->set_index (in_pipes.size () - 1); + std::swap (in_pipes.back (), in_pipes [active]); + active++; + + // Create outbound pipe. + pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); + zmq_assert (out_pipe); + session->set_inbound_pipe (&out_pipe->reader); + out_pipes.push_back (std::make_pair (&out_pipe->writer, session)); + + // Activate the session. send_plug (session); send_own (this, session); @@ -173,17 +198,79 @@ int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { - zmq_assert (false); + // Process pending commands, if any. + app_thread->process_commands (false, true); + + // Try to send the message. + bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); + + if (!(flags_ & ZMQ_NOBLOCK)) { + + // Oops, we couldn't send the message. Wait for the next + // command, process it and try to send the message again. + while (!sent) { + app_thread->process_commands (true, false); + sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); + } + } + else if (!sent) { + errno = EAGAIN; + return -1; + } + + return 0; } int zmq::socket_base_t::flush () { - zmq_assert (false); + for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); + it++) + it->first->flush (); + + return 0; } int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { - zmq_assert (false); + // If the message cannot be fetched immediately, there are two scenarios. + // For non-blocking recv, commands are processed in case there's a message + // already waiting we don't know about. If it's not, return EAGAIN. + // In blocking scenario, commands are processed over and over again until + // we are able to fetch a message. + bool fetched = fetch (msg_); + if (!fetched) { + if (flags_ & ZMQ_NOBLOCK) { + app_thread->process_commands (false, false); + fetched = fetch (msg_); + } + else { + while (!fetched) { + app_thread->process_commands (true, false); + ticks = 0; + fetched = fetch (msg_); + } + } + } + + // Once every inbound_poll_rate messages check for signals and process + // incoming commands. This happens only if we are not polling altogether + // because there are messages available all the time. If poll occurs, + // ticks is set to zero and thus we avoid this code. + // + // Note that 'recv' uses different command throttling algorithm (the one + // described above) from the one used by 'send'. This is because counting + // ticks is more efficient than doing rdtsc all the time. + if (++ticks == inbound_poll_rate) { + app_thread->process_commands (false, false); + ticks = 0; + } + + if (!fetched) { + errno = EAGAIN; + return -1; + } + + return 0; } int zmq::socket_base_t::close () @@ -229,11 +316,35 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_) return it->second; } +void zmq::socket_base_t::revive (reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); + in_pipes [index].first->set_index (active); + in_pipes [active].first->set_index (index); + std::swap (in_pipes [index], in_pipes [active]); + active++; +} + void zmq::socket_base_t::process_own (owned_t *object_) { io_objects.insert (object_); } +void zmq::socket_base_t::process_bind (owned_t *session_, + reader_t *in_pipe_, writer_t *out_pipe_) +{ + zmq_assert (in_pipe_); + in_pipe_->set_endpoint (this); + in_pipes.push_back (std::make_pair (in_pipe_, session_)); + in_pipes.back ().first->set_index (active); + in_pipes [active].first->set_index (in_pipes.size () - 1); + std::swap (in_pipes.back (), in_pipes [active]); + active++; + zmq_assert (out_pipe_); + out_pipes.push_back (std::make_pair (out_pipe_, session_)); +} + void zmq::socket_base_t::process_term_req (owned_t *object_) { // When shutting down we can ignore termination requests from owned @@ -260,3 +371,107 @@ void zmq::socket_base_t::process_term_ack () zmq_assert (pending_term_acks); pending_term_acks--; } + +bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) +{ + int pipes_count = out_pipes.size (); + + // If there are no pipes available, simply drop the message. + if (pipes_count == 0) { + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return true; + } + + // First check whether all pipes are available for writing. + for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); + it++) + if (!it->first->check_write (zmq_msg_size (msg_))) + return false; + + msg_content_t *content = (msg_content_t*) msg_->content; + + // For VSMs the copying is straighforward. + if (content == (msg_content_t*) ZMQ_VSM) { + for (out_pipes_t::iterator it = out_pipes.begin (); + it != out_pipes.end (); it++) { + it->first->write (msg_); + if (flush_) + it->first->flush (); + } + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return true; + } + + // Optimisation for the case when there's only a single pipe + // to send the message to - no refcount adjustment i.e. no atomic + // operations are needed. + if (pipes_count == 1) { + out_pipes.begin ()->first->write (msg_); + if (flush_) + out_pipes.begin ()->first->flush (); + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return true; + } + + // There are at least 2 destinations for the message. That means we have + // to deal with reference counting. First add N-1 references to + // the content (we are holding one reference anyway, that's why -1). + if (msg_->shared) + content->refcnt.add (pipes_count - 1); + else { + content->refcnt.set (pipes_count); + msg_->shared = true; + } + + // Push the message to all destinations. + for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); + it++) { + it->first->write (msg_); + if (flush_) + it->first->flush (); + } + + // Detach the original message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return true; +} + +bool zmq::socket_base_t::fetch (zmq_msg_t *msg_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + + bool fetched = in_pipes [current].first->read (msg_); + + // If there's no message in the pipe, move it to the list of + // non-active pipes. + if (!fetched) { + in_pipes [current].first->set_index (active - 1); + in_pipes [active - 1].first->set_index (current); + std::swap (in_pipes [current], in_pipes [active - 1]); + active--; + } + + current ++; + if (current >= active) + current = 0; + + if (fetched) + return true; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + return false; +} diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 20ac4e2..1f04bda 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -22,8 +22,11 @@ #include <set> #include <map> +#include <vector> #include <string> +#include <utility> +#include "i_endpoint.hpp" #include "object.hpp" #include "mutex.hpp" #include "options.hpp" @@ -32,7 +35,7 @@ namespace zmq { - class socket_base_t : public object_t + class socket_base_t : public object_t, public i_endpoint { public: @@ -57,22 +60,59 @@ namespace zmq bool unregister_session (const char *name_); class session_t *find_session (const char *name_); + // i_endpoint interface implementation. + void revive (class reader_t *pipe_); + private: // Handlers for incoming commands. void process_own (class owned_t *object_); + void process_bind (class owned_t *session_, + class reader_t *in_pipe_, class writer_t *out_pipe_); void process_term_req (class owned_t *object_); void process_term_ack (); + // Attempts to distribute the message to all the outbound pipes. + // Returns false if not possible because of pipe overflow. + bool distribute (struct zmq_msg_t *msg_, bool flush_); + + // Gets a message from one of the inbound pipes. Implementation of + // fair queueing. + bool fetch (struct zmq_msg_t *msg_); + // List of all I/O objects owned by this socket. The socket is // responsible for deallocating them before it quits. typedef std::set <class owned_t*> io_objects_t; io_objects_t io_objects; + // Inbound pipes, i.e. those the socket is getting messages from. + // The second member in the pair indicates the object on the other + // side of the pipe. + typedef std::vector <std::pair <class reader_t*, owned_t*> > + in_pipes_t; + in_pipes_t in_pipes; + + // Index of the next inbound pipe to read messages from. + in_pipes_t::size_type current; + + // Number of active inbound pipes. Active pipes are stored in the + // initial section of the in_pipes array. + in_pipes_t::size_type active; + + // Outbound pipes, i.e. those the socket is sending messages to. + // The second member in the pair indicates the object on the other + // side of the pipe. + typedef std::vector <std::pair <class writer_t*, owned_t*> > + out_pipes_t; + out_pipes_t out_pipes; + // Number of I/O objects that were already asked to terminate // but haven't acknowledged it yet. int pending_term_acks; + // Number of messages received since last command processing. + int ticks; + // Application thread the socket lives in. class app_thread_t *app_thread; diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 01b4137..6c51b63 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -43,9 +43,9 @@ namespace zmq { public: - // Initialises the pipe. If 'dead' is set to true, the pipe is - // created in dead state. - inline ypipe_t (bool dead_ = true) : + // Initialises the pipe. In D scenario it is created in dead state. + // Otherwise it's alive. + inline ypipe_t () : stop (false) { // Insert terminator element into the queue. @@ -54,7 +54,7 @@ namespace zmq // Let all the pointers to point to the terminator // (unless pipe is dead, in which case c is set to NULL). r = w = &queue.back (); - c.set (dead_ ? NULL : &queue.back ()); + c.set (D ? NULL : &queue.back ()); } // Following function (write) deliberately copies uninitialised data @@ -110,7 +110,7 @@ namespace zmq // available. inline bool read (T *value_) { - // Was the value was prefetched already? If so, return it. + // Was the value prefetched already? If so, return it. if (&queue.front () != r) { *value_ = queue.front (); queue.pop (); @@ -159,14 +159,14 @@ namespace zmq // If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however, - // during pipe shutdown when retrieving messages from it - // to deallocate them, this can happen. + // it can happen during pipe shutdown when messages + // are being deallocated. if (&queue.front () == r || !r) return false; } - // There was at least one value prefetched - - // return it to the caller. + // There was at least one value prefetched. + // Return it to the caller. *value_ = queue.front (); queue.pop (); return true; @@ -188,8 +188,8 @@ namespace zmq // exclusively by reader thread. T *r; - // The single contention point of contention between writer and - // reader thread. Points past the last flushed item. If it is NULL, + // The single point of contention between writer and reader thread. + // Points past the last flushed item. If it is NULL, // reader is asleep. This pointer should be always accessed using // atomic operations. atomic_ptr_t <T> c; diff --git a/src/yqueue.hpp b/src/yqueue.hpp index 0686f07..f20ac4c 100644 --- a/src/yqueue.hpp +++ b/src/yqueue.hpp @@ -88,7 +88,7 @@ namespace zmq back_chunk = end_chunk; back_pos = end_pos; - if (++ end_pos != N) + if (++end_pos != N) return; end_chunk->next = new chunk_t; diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 124d77b..39b7192 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -54,9 +54,9 @@ bool zmq::zmq_encoder_t::message_ready () // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (!source->read (&in_progress)) { + if (!source->read (&in_progress)) return false; - } + size_t size = zmq_msg_size (&in_progress); // For messages less than 255 bytes long, write one byte of message size. diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index d8b8cfc..cd7ad7e 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -129,7 +129,12 @@ void zmq::zmq_engine_t::out_event () } } +void zmq::zmq_engine_t::revive () +{ + set_pollout (handle); +} + void zmq::zmq_engine_t::error () { -// zmq_assert (false); + zmq_assert (false); } diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 38a390d..ba25ded 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -42,6 +42,10 @@ namespace zmq void in_event (); void out_event (); + // This method is called by the session to signalise that there + // are messages to send available. + void revive (); + private: // Function to handle network disconnections. diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index bfd79b4..7e2f311 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -59,7 +59,8 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) session_t *session = owner->find_session (session_name.c_str ()); if (!session) { io_thread_t *io_thread = choose_io_thread (options.affinity); - session = new session_t (io_thread, owner, session_name.c_str ()); + session = new session_t (io_thread, owner, session_name.c_str (), + options); zmq_assert (session); send_plug (session); send_own (owner, session); |