summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-27 10:54:28 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-27 10:54:28 +0200
commit99c5d9283622a0b37ee80f83ff4875c059fc5990 (patch)
tree3460ec503898d2184dc807e47eea679d24d96d5c
parentaacdb7a454686bfac93164d0e67e785658d48a3c (diff)
pipes added
-rw-r--r--include/zmq.h2
-rw-r--r--src/Makefile.am2
-rw-r--r--src/app_thread.cpp26
-rw-r--r--src/app_thread.hpp4
-rw-r--r--src/command.hpp11
-rw-r--r--src/config.hpp8
-rw-r--r--src/i_endpoint.hpp33
-rw-r--r--src/object.cpp30
-rw-r--r--src/object.hpp9
-rw-r--r--src/pipe.cpp112
-rw-r--r--src/pipe.hpp102
-rw-r--r--src/session.cpp56
-rw-r--r--src/session.hpp24
-rw-r--r--src/socket_base.cpp227
-rw-r--r--src/socket_base.hpp42
-rw-r--r--src/ypipe.hpp22
-rw-r--r--src/yqueue.hpp2
-rw-r--r--src/zmq_encoder.cpp4
-rw-r--r--src/zmq_engine.cpp7
-rw-r--r--src/zmq_engine.hpp4
-rw-r--r--src/zmq_listener_init.cpp3
21 files changed, 680 insertions, 50 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 34ce80c..fad51ca 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -67,7 +67,7 @@ extern "C" {
// single accept. There's no message routing or message filtering involved.
#define ZMQ_P2P 0
-// Socket to distribute data. Recv fuction is not implemeted for this socket
+// Socket to distribute data. Recv fuction is not implemented for this socket
// type. Messages are distributed in fanout fashion to all peers.
#define ZMQ_PUB 1
diff --git a/src/Makefile.am b/src/Makefile.am
index 396e3a3..b6a4540 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -19,6 +19,7 @@ libzmq_la_SOURCES = \
io_object.hpp \
io_thread.hpp \
ip.hpp \
+ i_endpoint.hpp \
i_poller.hpp \
i_poll_events.hpp \
i_signaler.hpp \
@@ -66,6 +67,7 @@ libzmq_la_SOURCES = \
object.cpp \
options.cpp \
owned.cpp \
+ pipe.cpp \
poll.cpp \
select.cpp \
session.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 74ba357..db73ec1 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -77,7 +77,7 @@ bool zmq::app_thread_t::make_current ()
return true;
}
-void zmq::app_thread_t::process_commands (bool block_)
+void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
ypollset_t::signals_t signals;
if (block_)
@@ -91,24 +91,26 @@ void zmq::app_thread_t::process_commands (bool block_)
// depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
// etc. The optimisation makes sense only on platforms where getting
// a timestamp is a very cheap operation (tens of nanoseconds).
+ if (throttle_) {
- // Get timestamp counter.
+ // Get timestamp counter.
#if defined __GNUC__
- uint32_t low;
- uint32_t high;
- __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
- uint64_t current_time = (uint64_t) high << 32 | low;
+ uint32_t low;
+ uint32_t high;
+ __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
+ uint64_t current_time = (uint64_t) high << 32 | low;
#elif defined _MSC_VER
- uint64_t current_time = __rdtsc ();
+ uint64_t current_time = __rdtsc ();
#else
#error
#endif
- // Check whether certain time have elapsed since last command
- // processing.
- if (current_time - last_processing_time <= max_command_delay)
- return;
- last_processing_time = current_time;
+ // Check whether certain time have elapsed since last command
+ // processing.
+ if (current_time - last_processing_time <= max_command_delay)
+ return;
+ last_processing_time = current_time;
+ }
#endif
// Check whether there are any commands pending for this thread.
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index e7bbf70..e45b1b2 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -53,7 +53,9 @@ namespace zmq
// Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed.
- void process_commands (bool block_);
+ // If throttle argument is true, commands are processed at most once
+ // in a predefined time period.
+ void process_commands (bool block_, bool throttle_);
// Create a socket of a specified type.
class socket_base_t *create_socket (int type_);
diff --git a/src/command.hpp b/src/command.hpp
index 41c7d6c..d3bad79 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -39,6 +39,7 @@ namespace zmq
own,
attach,
bind,
+ revive,
term_req,
term,
term_ack
@@ -65,10 +66,18 @@ namespace zmq
class zmq_engine_t *engine;
} attach;
- // Sent between objects to establish pipe(s) between them.
+ // Sent from session to socket to establish pipe(s) between them.
struct {
+ class owned_t *session;
+ class reader_t *in_pipe;
+ class writer_t *out_pipe;
} bind;
+ // Sent by pipe writer to inform dormant pipe reader that there
+ // are messages in the pipe.
+ struct {
+ } revive;
+
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
diff --git a/src/config.hpp b/src/config.hpp
index 88b93d7..17e67b9 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -38,6 +38,14 @@ namespace zmq
// footprint of dispatcher.
command_pipe_granularity = 4,
+ // Determines how often does socket poll for new commands when it
+ // still has unprocessed messages to handle. Thus, if it is set to 100,
+ // socket will process 100 inbound messages before doing the poll.
+ // If there are no unprocessed messages available, poll is done
+ // immediately. Decreasing the value trades overall latency for more
+ // real-time behaviour (less latency peaks).
+ inbound_poll_rate = 100,
+
// Maximal batching size for engines with receiving functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be read by a single 'recv' system call, thus avoiding
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
new file mode 100644
index 0000000..bb7409e
--- /dev/null
+++ b/src/i_endpoint.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+
+namespace zmq
+{
+
+ struct i_endpoint
+ {
+ virtual void revive (class reader_t *pipe_) = 0;
+ };
+
+}
+
+#endif
diff --git a/src/object.cpp b/src/object.cpp
index 0a25750..4d54ebf 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -20,6 +20,7 @@
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
+#include "pipe.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
#include "owned.hpp"
@@ -57,6 +58,10 @@ void zmq::object_t::process_command (command_t &cmd_)
{
switch (cmd_.type) {
+ case command_t::revive:
+ process_revive ();
+ break;
+
case command_t::stop:
process_stop ();
break;
@@ -74,7 +79,8 @@ void zmq::object_t::process_command (command_t &cmd_)
return;
case command_t::bind:
- process_bind ();
+ process_bind (cmd_.args.bind.session,
+ cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
return;
case command_t::term_req:
@@ -140,11 +146,23 @@ void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
send_command (cmd);
}
-void zmq::object_t::send_bind (object_t *destination_)
+void zmq::object_t::send_bind (object_t *destination_, owned_t *session_,
+ reader_t *in_pipe_, writer_t *out_pipe_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::bind;
+ cmd.args.bind.session = session_;
+ cmd.args.bind.in_pipe = in_pipe_;
+ cmd.args.bind.out_pipe = out_pipe_;
+ send_command (cmd);
+}
+
+void zmq::object_t::send_revive (object_t *destination_)
+{
+ command_t cmd;
+ cmd.destination = destination_;
+ cmd.type = command_t::revive;
send_command (cmd);
}
@@ -194,7 +212,13 @@ void zmq::object_t::process_attach (zmq_engine_t *engine_)
zmq_assert (false);
}
-void zmq::object_t::process_bind ()
+void zmq::object_t::process_bind (owned_t *session_,
+ reader_t *in_pipe_, writer_t *out_pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::object_t::process_revive ()
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index 31c8c40..0dbac24 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -24,7 +24,6 @@
namespace zmq
{
-
// Base class for all objects that participate in inter-thread
// communication.
@@ -58,7 +57,9 @@ namespace zmq
class owned_t *object_);
void send_attach (class session_t *destination_,
class zmq_engine_t *engine_);
- void send_bind (object_t *destination_);
+ void send_bind (object_t *destination_, class owned_t *session_,
+ class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void send_revive (class object_t *destination_);
void send_term_req (class socket_base_t *destination_,
class owned_t *object_);
void send_term (class owned_t *destination_);
@@ -70,7 +71,9 @@ namespace zmq
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
virtual void process_attach (class zmq_engine_t *engine_);
- virtual void process_bind ();
+ virtual void process_bind (class owned_t *session_,
+ class reader_t *in_pipe_, class writer_t *out_pipe_);
+ virtual void process_revive ();
virtual void process_term_req (class owned_t *object_);
virtual void process_term ();
virtual void process_term_ack ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
new file mode 100644
index 0000000..5016631
--- /dev/null
+++ b/src/pipe.cpp
@@ -0,0 +1,112 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <pthread.h>
+
+#include "pipe.hpp"
+
+zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
+ uint64_t hwm_, uint64_t lwm_) :
+ object_t (parent_),
+ pipe (pipe_),
+ peer (&pipe_->writer),
+ hwm (hwm_),
+ lwm (lwm_),
+ index (-1),
+ endpoint (NULL)
+{
+}
+
+zmq::reader_t::~reader_t ()
+{
+}
+
+bool zmq::reader_t::read (zmq_msg_t *msg_)
+{
+ return pipe->read (msg_);
+
+ // TODO: Adjust the size of the pipe.
+}
+
+void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
+{
+ endpoint = endpoint_;
+}
+
+void zmq::reader_t::set_index (int index_)
+{
+ index = index_;
+}
+
+int zmq::reader_t::get_index ()
+{
+ return index;
+}
+
+void zmq::reader_t::process_revive ()
+{
+ endpoint->revive (this);
+}
+
+zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
+ uint64_t hwm_, uint64_t lwm_) :
+ object_t (parent_),
+ pipe (pipe_),
+ peer (&pipe_->reader),
+ hwm (hwm_),
+ lwm (lwm_)
+{
+}
+
+zmq::writer_t::~writer_t ()
+{
+}
+
+bool zmq::writer_t::check_write (uint64_t size_)
+{
+ // TODO: Check whether hwm is exceeded.
+
+ return true;
+}
+
+bool zmq::writer_t::write (struct zmq_msg_t *msg_)
+{
+ pipe->write (*msg_);
+ return true;
+
+ // TODO: Adjust size of the pipe.
+}
+
+void zmq::writer_t::flush ()
+{
+ if (!pipe->flush ())
+ send_revive (peer);
+}
+
+zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
+ uint64_t hwm_, uint64_t lwm_) :
+ reader (reader_parent_, this, hwm_, lwm_),
+ writer (writer_parent_, this, hwm_, lwm_)
+{
+}
+
+zmq::pipe_t::~pipe_t ()
+{
+}
+
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 28e4b4d..d48fc47 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -22,15 +22,117 @@
#include "../include/zmq.h"
+#include "stdint.hpp"
+#include "i_endpoint.hpp"
#include "ypipe.hpp"
#include "config.hpp"
+#include "object.hpp"
namespace zmq
{
+ class reader_t : public object_t
+ {
+ public:
+
+ reader_t (class object_t *parent_, class pipe_t *pipe_,
+ uint64_t hwm_, uint64_t lwm_);
+ ~reader_t ();
+
+ // Reads a message to the underlying pipe.
+ bool read (struct zmq_msg_t *msg_);
+
+ // Mnaipulation of index of the pipe.
+ void set_endpoint (i_endpoint *endpoint_);
+ void set_index (int index_);
+ int get_index ();
+
+ private:
+
+ // Command handlers.
+ void process_revive ();
+
+ // The underlying pipe.
+ class pipe_t *pipe;
+
+ // Pipe writer associated with the other side of the pipe.
+ class object_t *peer;
+
+ // High and low watermarks for in-memory storage (in bytes).
+ uint64_t hwm;
+ uint64_t lwm;
+
+ // Positions of head and tail of the pipe (in bytes).
+ uint64_t head;
+ uint64_t tail;
+ uint64_t last_sent_head;
+
+ // Index of the pipe in the socket's list of inbound pipes.
+ int index;
+
+ // Endpoint (either session or socket) the pipe is attached to.
+ i_endpoint *endpoint;
+
+ reader_t (const reader_t&);
+ void operator = (const reader_t&);
+ };
+
+ class writer_t : public object_t
+ {
+ public:
+
+ writer_t (class object_t *parent_, class pipe_t *pipe_,
+ uint64_t hwm_, uint64_t lwm_);
+ ~writer_t ();
+
+ // Checks whether message with specified size can be written to the
+ // pipe. If writing the message would cause high watermark to be
+ // exceeded, the function returns false.
+ bool check_write (uint64_t size_);
+
+ // Writes a message to the underlying pipe. Returns false if the
+ // message cannot be written because high watermark was reached.
+ bool write (struct zmq_msg_t *msg_);
+
+ // Flush the messages downsteam.
+ void flush ();
+
+ private:
+
+ // The underlying pipe.
+ class pipe_t *pipe;
+
+ // Pipe reader associated with the other side of the pipe.
+ class object_t *peer;
+
+ // High and low watermarks for in-memory storage (in bytes).
+ uint64_t hwm;
+ uint64_t lwm;
+
+ // Positions of head and tail of the pipe (in bytes).
+ uint64_t head;
+ uint64_t tail;
+
+ writer_t (const writer_t&);
+ void operator = (const writer_t&);
+ };
+
// Message pipe.
class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity>
{
+ public:
+
+ pipe_t (object_t *reader_parent_, object_t *writer_parent_,
+ uint64_t hwm_, uint64_t lwm_);
+ ~pipe_t ();
+
+ reader_t reader;
+ writer_t writer;
+
+ private:
+
+ pipe_t (const pipe_t&);
+ void operator = (const pipe_t&);
};
}
diff --git a/src/session.cpp b/src/session.cpp
index fc1f858..115fb85 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -20,12 +20,17 @@
#include "session.hpp"
#include "zmq_engine.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const char *name_) :
+ const char *name_, const options_t &options_) :
owned_t (parent_, owner_),
+ in_pipe (NULL),
+ active (false),
+ out_pipe (NULL),
engine (NULL),
- name (name_)
+ name (name_),
+ options (options_)
{
}
@@ -33,18 +38,48 @@ zmq::session_t::~session_t ()
{
}
+void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
+{
+ zmq_assert (!in_pipe);
+ in_pipe = pipe_;
+ active = true;
+ in_pipe->set_endpoint (this);
+}
+void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
+{
+ zmq_assert (!out_pipe);
+ out_pipe = pipe_;
+}
+
+
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
- return false;
+ if (!active)
+ return false;
+
+ bool fetched = in_pipe->read (msg_);
+ if (!fetched)
+ active = false;
+
+ return fetched;
}
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- return false;
+ return out_pipe->write (msg_);
}
void zmq::session_t::flush ()
{
+ out_pipe->flush ();
+}
+
+void zmq::session_t::revive (reader_t *pipe_)
+{
+ zmq_assert (in_pipe == pipe_);
+ active = true;
+ if (engine)
+ engine->revive ();
}
void zmq::session_t::process_plug ()
@@ -56,6 +91,19 @@ void zmq::session_t::process_plug ()
// We should syslog it and drop the session. TODO
zmq_assert (ok);
+ // If session is created by 'connect' function, it has the pipes set
+ // already. Otherwise, it's being created by the listener and the pipes
+ // are yet to be created.
+ if (!in_pipe && !out_pipe) {
+ pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
+ zmq_assert (inbound);
+ in_pipe = &inbound->reader;
+ pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
+ zmq_assert (outbound);
+ out_pipe = &outbound->writer;
+ send_bind (owner, this, &outbound->reader, &inbound->writer);
+ }
+
owned_t::process_plug ();
}
diff --git a/src/session.hpp b/src/session.hpp
index 6d6bcf7..b79fb4b 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -23,17 +23,22 @@
#include <string>
#include "i_inout.hpp"
+#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
namespace zmq
{
- class session_t : public owned_t, public i_inout
+ class session_t : public owned_t, public i_inout, public i_endpoint
{
public:
- session_t (object_t *parent_, socket_base_t *owner_, const char *name_);
+ session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
+ const options_t &options_);
+
+ void set_inbound_pipe (class reader_t *pipe_);
+ void set_outbound_pipe (class writer_t *pipe_);
private:
@@ -44,17 +49,32 @@ namespace zmq
bool write (::zmq_msg_t *msg_);
void flush ();
+ // i_endpoint interface implementation.
+ void revive (class reader_t *pipe_);
+
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
void process_attach (class zmq_engine_t *engine_);
+ // Inbound pipe, i.e. one the session is getting messages from.
+ class reader_t *in_pipe;
+
+ // If true, in_pipe is active. Otherwise there are no messages to get.
+ bool active;
+
+ // Outbound pipe, i.e. one the socket is sending messages to.
+ class writer_t *out_pipe;
+
class zmq_engine_t *engine;
// The name of the session. One that is used to register it with
// socket-level repository of sessions.
std::string name;
+ // Inherited socket options.
+ options_t options;
+
session_t (const session_t&);
void operator = (const session_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fb7bdcf..68fc82b 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -27,18 +27,23 @@
#include "err.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
+#include "msg_content.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "config.hpp"
#include "owned.hpp"
#include "uuid.hpp"
+#include "pipe.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
+ current (0),
+ active (0),
pending_term_acks (0),
+ ticks (0),
app_thread (parent_),
shutting_down (false)
-{
+{
}
zmq::socket_base_t::~socket_base_t ()
@@ -65,7 +70,7 @@ zmq::socket_base_t::~socket_base_t ()
// Process commands till we get all the termination acknowledgements.
while (pending_term_acks)
- app_thread->process_commands (true);
+ app_thread->process_commands (true, false);
}
// Check whether there are no session leaks.
@@ -150,8 +155,28 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session_t *session = new session_t (io_thread, this, session_name.c_str ());
+ session_t *session = new session_t (io_thread, this, session_name.c_str (),
+ options);
zmq_assert (session);
+
+ // Create inbound pipe.
+ pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
+ zmq_assert (in_pipe);
+ in_pipe->reader.set_endpoint (this);
+ session->set_outbound_pipe (&in_pipe->writer);
+ in_pipes.push_back (std::make_pair (&in_pipe->reader, session));
+ in_pipes.back ().first->set_index (active);
+ in_pipes [active].first->set_index (in_pipes.size () - 1);
+ std::swap (in_pipes.back (), in_pipes [active]);
+ active++;
+
+ // Create outbound pipe.
+ pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ session->set_inbound_pipe (&out_pipe->reader);
+ out_pipes.push_back (std::make_pair (&out_pipe->writer, session));
+
+ // Activate the session.
send_plug (session);
send_own (this, session);
@@ -173,17 +198,79 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
+ // Process pending commands, if any.
+ app_thread->process_commands (false, true);
+
+ // Try to send the message.
+ bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
+
+ if (!(flags_ & ZMQ_NOBLOCK)) {
+
+ // Oops, we couldn't send the message. Wait for the next
+ // command, process it and try to send the message again.
+ while (!sent) {
+ app_thread->process_commands (true, false);
+ sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
+ }
+ }
+ else if (!sent) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return 0;
}
int zmq::socket_base_t::flush ()
{
- zmq_assert (false);
+ for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
+ it++)
+ it->first->flush ();
+
+ return 0;
}
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
+ // If the message cannot be fetched immediately, there are two scenarios.
+ // For non-blocking recv, commands are processed in case there's a message
+ // already waiting we don't know about. If it's not, return EAGAIN.
+ // In blocking scenario, commands are processed over and over again until
+ // we are able to fetch a message.
+ bool fetched = fetch (msg_);
+ if (!fetched) {
+ if (flags_ & ZMQ_NOBLOCK) {
+ app_thread->process_commands (false, false);
+ fetched = fetch (msg_);
+ }
+ else {
+ while (!fetched) {
+ app_thread->process_commands (true, false);
+ ticks = 0;
+ fetched = fetch (msg_);
+ }
+ }
+ }
+
+ // Once every inbound_poll_rate messages check for signals and process
+ // incoming commands. This happens only if we are not polling altogether
+ // because there are messages available all the time. If poll occurs,
+ // ticks is set to zero and thus we avoid this code.
+ //
+ // Note that 'recv' uses different command throttling algorithm (the one
+ // described above) from the one used by 'send'. This is because counting
+ // ticks is more efficient than doing rdtsc all the time.
+ if (++ticks == inbound_poll_rate) {
+ app_thread->process_commands (false, false);
+ ticks = 0;
+ }
+
+ if (!fetched) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return 0;
}
int zmq::socket_base_t::close ()
@@ -229,11 +316,35 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second;
}
+void zmq::socket_base_t::revive (reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
+ in_pipes [index].first->set_index (active);
+ in_pipes [active].first->set_index (index);
+ std::swap (in_pipes [index], in_pipes [active]);
+ active++;
+}
+
void zmq::socket_base_t::process_own (owned_t *object_)
{
io_objects.insert (object_);
}
+void zmq::socket_base_t::process_bind (owned_t *session_,
+ reader_t *in_pipe_, writer_t *out_pipe_)
+{
+ zmq_assert (in_pipe_);
+ in_pipe_->set_endpoint (this);
+ in_pipes.push_back (std::make_pair (in_pipe_, session_));
+ in_pipes.back ().first->set_index (active);
+ in_pipes [active].first->set_index (in_pipes.size () - 1);
+ std::swap (in_pipes.back (), in_pipes [active]);
+ active++;
+ zmq_assert (out_pipe_);
+ out_pipes.push_back (std::make_pair (out_pipe_, session_));
+}
+
void zmq::socket_base_t::process_term_req (owned_t *object_)
{
// When shutting down we can ignore termination requests from owned
@@ -260,3 +371,107 @@ void zmq::socket_base_t::process_term_ack ()
zmq_assert (pending_term_acks);
pending_term_acks--;
}
+
+bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
+{
+ int pipes_count = out_pipes.size ();
+
+ // If there are no pipes available, simply drop the message.
+ if (pipes_count == 0) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return true;
+ }
+
+ // First check whether all pipes are available for writing.
+ for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
+ it++)
+ if (!it->first->check_write (zmq_msg_size (msg_)))
+ return false;
+
+ msg_content_t *content = (msg_content_t*) msg_->content;
+
+ // For VSMs the copying is straighforward.
+ if (content == (msg_content_t*) ZMQ_VSM) {
+ for (out_pipes_t::iterator it = out_pipes.begin ();
+ it != out_pipes.end (); it++) {
+ it->first->write (msg_);
+ if (flush_)
+ it->first->flush ();
+ }
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return true;
+ }
+
+ // Optimisation for the case when there's only a single pipe
+ // to send the message to - no refcount adjustment i.e. no atomic
+ // operations are needed.
+ if (pipes_count == 1) {
+ out_pipes.begin ()->first->write (msg_);
+ if (flush_)
+ out_pipes.begin ()->first->flush ();
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return true;
+ }
+
+ // There are at least 2 destinations for the message. That means we have
+ // to deal with reference counting. First add N-1 references to
+ // the content (we are holding one reference anyway, that's why -1).
+ if (msg_->shared)
+ content->refcnt.add (pipes_count - 1);
+ else {
+ content->refcnt.set (pipes_count);
+ msg_->shared = true;
+ }
+
+ // Push the message to all destinations.
+ for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
+ it++) {
+ it->first->write (msg_);
+ if (flush_)
+ it->first->flush ();
+ }
+
+ // Detach the original message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return true;
+}
+
+bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+
+ bool fetched = in_pipes [current].first->read (msg_);
+
+ // If there's no message in the pipe, move it to the list of
+ // non-active pipes.
+ if (!fetched) {
+ in_pipes [current].first->set_index (active - 1);
+ in_pipes [active - 1].first->set_index (current);
+ std::swap (in_pipes [current], in_pipes [active - 1]);
+ active--;
+ }
+
+ current ++;
+ if (current >= active)
+ current = 0;
+
+ if (fetched)
+ return true;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ return false;
+}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 20ac4e2..1f04bda 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -22,8 +22,11 @@
#include <set>
#include <map>
+#include <vector>
#include <string>
+#include <utility>
+#include "i_endpoint.hpp"
#include "object.hpp"
#include "mutex.hpp"
#include "options.hpp"
@@ -32,7 +35,7 @@
namespace zmq
{
- class socket_base_t : public object_t
+ class socket_base_t : public object_t, public i_endpoint
{
public:
@@ -57,22 +60,59 @@ namespace zmq
bool unregister_session (const char *name_);
class session_t *find_session (const char *name_);
+ // i_endpoint interface implementation.
+ void revive (class reader_t *pipe_);
+
private:
// Handlers for incoming commands.
void process_own (class owned_t *object_);
+ void process_bind (class owned_t *session_,
+ class reader_t *in_pipe_, class writer_t *out_pipe_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
+ // Attempts to distribute the message to all the outbound pipes.
+ // Returns false if not possible because of pipe overflow.
+ bool distribute (struct zmq_msg_t *msg_, bool flush_);
+
+ // Gets a message from one of the inbound pipes. Implementation of
+ // fair queueing.
+ bool fetch (struct zmq_msg_t *msg_);
+
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects;
+ // Inbound pipes, i.e. those the socket is getting messages from.
+ // The second member in the pair indicates the object on the other
+ // side of the pipe.
+ typedef std::vector <std::pair <class reader_t*, owned_t*> >
+ in_pipes_t;
+ in_pipes_t in_pipes;
+
+ // Index of the next inbound pipe to read messages from.
+ in_pipes_t::size_type current;
+
+ // Number of active inbound pipes. Active pipes are stored in the
+ // initial section of the in_pipes array.
+ in_pipes_t::size_type active;
+
+ // Outbound pipes, i.e. those the socket is sending messages to.
+ // The second member in the pair indicates the object on the other
+ // side of the pipe.
+ typedef std::vector <std::pair <class writer_t*, owned_t*> >
+ out_pipes_t;
+ out_pipes_t out_pipes;
+
// Number of I/O objects that were already asked to terminate
// but haven't acknowledged it yet.
int pending_term_acks;
+ // Number of messages received since last command processing.
+ int ticks;
+
// Application thread the socket lives in.
class app_thread_t *app_thread;
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index 01b4137..6c51b63 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -43,9 +43,9 @@ namespace zmq
{
public:
- // Initialises the pipe. If 'dead' is set to true, the pipe is
- // created in dead state.
- inline ypipe_t (bool dead_ = true) :
+ // Initialises the pipe. In D scenario it is created in dead state.
+ // Otherwise it's alive.
+ inline ypipe_t () :
stop (false)
{
// Insert terminator element into the queue.
@@ -54,7 +54,7 @@ namespace zmq
// Let all the pointers to point to the terminator
// (unless pipe is dead, in which case c is set to NULL).
r = w = &queue.back ();
- c.set (dead_ ? NULL : &queue.back ());
+ c.set (D ? NULL : &queue.back ());
}
// Following function (write) deliberately copies uninitialised data
@@ -110,7 +110,7 @@ namespace zmq
// available.
inline bool read (T *value_)
{
- // Was the value was prefetched already? If so, return it.
+ // Was the value prefetched already? If so, return it.
if (&queue.front () != r) {
*value_ = queue.front ();
queue.pop ();
@@ -159,14 +159,14 @@ namespace zmq
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
- // during pipe shutdown when retrieving messages from it
- // to deallocate them, this can happen.
+ // it can happen during pipe shutdown when messages
+ // are being deallocated.
if (&queue.front () == r || !r)
return false;
}
- // There was at least one value prefetched -
- // return it to the caller.
+ // There was at least one value prefetched.
+ // Return it to the caller.
*value_ = queue.front ();
queue.pop ();
return true;
@@ -188,8 +188,8 @@ namespace zmq
// exclusively by reader thread.
T *r;
- // The single contention point of contention between writer and
- // reader thread. Points past the last flushed item. If it is NULL,
+ // The single point of contention between writer and reader thread.
+ // Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t <T> c;
diff --git a/src/yqueue.hpp b/src/yqueue.hpp
index 0686f07..f20ac4c 100644
--- a/src/yqueue.hpp
+++ b/src/yqueue.hpp
@@ -88,7 +88,7 @@ namespace zmq
back_chunk = end_chunk;
back_pos = end_pos;
- if (++ end_pos != N)
+ if (++end_pos != N)
return;
end_chunk->next = new chunk_t;
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 124d77b..39b7192 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -54,9 +54,9 @@ bool zmq::zmq_encoder_t::message_ready ()
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
- if (!source->read (&in_progress)) {
+ if (!source->read (&in_progress))
return false;
- }
+
size_t size = zmq_msg_size (&in_progress);
// For messages less than 255 bytes long, write one byte of message size.
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index d8b8cfc..cd7ad7e 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -129,7 +129,12 @@ void zmq::zmq_engine_t::out_event ()
}
}
+void zmq::zmq_engine_t::revive ()
+{
+ set_pollout (handle);
+}
+
void zmq::zmq_engine_t::error ()
{
-// zmq_assert (false);
+ zmq_assert (false);
}
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 38a390d..ba25ded 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -42,6 +42,10 @@ namespace zmq
void in_event ();
void out_event ();
+ // This method is called by the session to signalise that there
+ // are messages to send available.
+ void revive ();
+
private:
// Function to handle network disconnections.
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index bfd79b4..7e2f311 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -59,7 +59,8 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
session_t *session = owner->find_session (session_name.c_str ());
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session = new session_t (io_thread, owner, session_name.c_str ());
+ session = new session_t (io_thread, owner, session_name.c_str (),
+ options);
zmq_assert (session);
send_plug (session);
send_own (owner, session);