summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp460
1 files changed, 124 insertions, 336 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index bb8e7c9..c669e04 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -27,7 +27,6 @@
#include "dispatcher.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
-#include "msg_content.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "config.hpp"
@@ -42,145 +41,28 @@
zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
object_t (parent_),
type (type_),
- current (0),
- active (0),
pending_term_acks (0),
ticks (0),
app_thread (parent_),
- shutting_down (false),
- index (-1)
+ shutting_down (false)
{
}
zmq::socket_base_t::~socket_base_t ()
{
- shutting_down = true;
-
- // Ask all pipes to terminate.
- for (in_pipes_t::iterator it = in_pipes.begin ();
- it != in_pipes.end (); it++)
- (*it)->term ();
- in_pipes.clear ();
- for (out_pipes_t::iterator it = out_pipes.begin ();
- it != out_pipes.end (); it++)
- (*it)->term ();
- out_pipes.clear ();
-
- while (true) {
-
- // On third pass of the loop there should be no more I/O objects
- // because all connecters and listerners were destroyed during
- // the first pass and all engines delivered by delayed 'own' commands
- // are destroyed during the second pass.
- if (io_objects.empty () && !pending_term_acks)
- break;
-
- // Send termination request to all associated I/O objects.
- for (io_objects_t::iterator it = io_objects.begin ();
- it != io_objects.end (); it++)
- send_term (*it);
-
- // Move the objects to the list of pending term acks.
- pending_term_acks += io_objects.size ();
- io_objects.clear ();
-
- // Process commands till we get all the termination acknowledgements.
- while (pending_term_acks)
- app_thread->process_commands (true, false);
- }
-
- // Check whether there are no session leaks.
- sessions_sync.lock ();
- zmq_assert (sessions.empty ());
- sessions_sync.unlock ();
}
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
- switch (option_) {
-
- case ZMQ_HWM:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.hwm = *((int64_t*) optval_);
- return 0;
-
- case ZMQ_LWM:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.lwm = *((int64_t*) optval_);
- return 0;
-
- case ZMQ_SWAP:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.swap = *((int64_t*) optval_);
- return 0;
-
- case ZMQ_AFFINITY:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.affinity = (uint64_t) *((int64_t*) optval_);
- return 0;
-
- case ZMQ_IDENTITY:
- options.identity.assign ((const char*) optval_, optvallen_);
- return 0;
-
- case ZMQ_SUBSCRIBE:
- case ZMQ_UNSUBSCRIBE:
- errno = EFAULT;
- return -1;
-
- case ZMQ_RATE:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.rate = (uint32_t) *((int64_t*) optval_);
- return 0;
-
- case ZMQ_RECOVERY_IVL:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.recovery_ivl = (uint32_t) *((int64_t*) optval_);
- return 0;
-
- case ZMQ_MCAST_LOOP:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
-
- if ((int64_t) *((int64_t*) optval_) == 0) {
-
- options.use_multicast_loop = false;
-
- } else if ((int64_t) *((int64_t*) optval_) == 1) {
-
- options.use_multicast_loop = true;
-
- } else {
- errno = EINVAL;
- return -1;
- }
- return 0;
-
- default:
- errno = EINVAL;
- return -1;
- }
+ // First, check whether specific socket type overloads the option.
+ int rc = xsetsockopt (option_, optval_, optvallen_);
+ if (rc == 0 || errno != EINVAL)
+ return rc;
+
+ // If the socket type doesn't support the option, pass it to
+ // the generic option parser.
+ return options.setsockopt (option_, optval_, optvallen_);
}
int zmq::socket_base_t::bind (const char *addr_)
@@ -251,23 +133,29 @@ int zmq::socket_base_t::connect (const char *addr_)
options, true);
zmq_assert (session);
- // Create inbound pipe.
- pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
- zmq_assert (in_pipe);
- in_pipe->reader.set_endpoint (this);
- session->attach_outpipe (&in_pipe->writer);
- in_pipes.push_back (&in_pipe->reader);
- in_pipes.back ()->set_index (active);
- in_pipes [active]->set_index (in_pipes.size () - 1);
- std::swap (in_pipes.back (), in_pipes [active]);
- active++;
-
- // Create outbound pipe.
- pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
- zmq_assert (out_pipe);
- out_pipe->writer.set_endpoint (this);
- session->attach_inpipe (&out_pipe->reader);
- out_pipes.push_back (&out_pipe->writer);
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
+
+ // Create inbound pipe, if required.
+ if (xrequires_in ()) {
+ in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
+ zmq_assert (in_pipe);
+
+ }
+
+ // Create outbound pipe, if required.
+ if (xrequires_out ()) {
+ out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
+
+ // Attach the pipes to the socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL);
+
+ // Attach the pipes to the session object.
+ session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL);
// Activate the session.
send_plug (session);
@@ -294,6 +182,13 @@ int zmq::socket_base_t::connect (const char *addr_)
#if defined ZMQ_HAVE_OPENPGM
if (addr_type == "pgm" || addr_type == "udp") {
+ // If the socket type requires bi-directional communication
+ // multicast is not an option (it is uni-directional).
+ if (xrequires_in () && xrequires_out ()) {
+ errno = EFAULT;
+ return -1;
+ }
+
// For udp, pgm transport with udp encapsulation is used.
bool udp_encapsulation = false;
if (addr_type == "udp")
@@ -365,56 +260,61 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
app_thread->process_commands (false, true);
// Try to send the message.
- bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
-
- if (!(flags_ & ZMQ_NOBLOCK)) {
+ int rc = xsend (msg_, flags_);
+ if (rc == 0)
+ return 0;
- // Oops, we couldn't send the message. Wait for the next
- // command, process it and try to send the message again.
- while (!sent) {
- app_thread->process_commands (true, false);
- sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
- }
- }
- else if (!sent) {
- errno = EAGAIN;
+ // In case of non-blocking send we'll simply propagate
+ // the error - including EAGAIN - upwards.
+ if (flags_ & ZMQ_NOBLOCK)
return -1;
- }
+ // Oops, we couldn't send the message. Wait for the next
+ // command, process it and try to send the message again.
+ while (rc != 0) {
+ if (errno != EAGAIN)
+ return -1;
+ app_thread->process_commands (true, false);
+ rc = xsend (msg_, flags_);
+ }
return 0;
}
int zmq::socket_base_t::flush ()
{
- for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
- it++)
- (*it)->flush ();
-
- return 0;
+ return xflush ();
}
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
+ // Get the message and return immediately if successfull.
+ int rc = xrecv (msg_, flags_);
+ if (rc == 0)
+ return 0;
+
// If the message cannot be fetched immediately, there are two scenarios.
- // For non-blocking recv, commands are processed in case there's a message
- // already waiting we don't know about. If it's not, return EAGAIN.
+ // For non-blocking recv, commands are processed in case there's a revive
+ // command already waiting int a command pipe. If it's not, return EAGAIN.
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
- bool fetched = fetch (msg_);
- if (!fetched) {
- if (flags_ & ZMQ_NOBLOCK) {
- app_thread->process_commands (false, false);
- fetched = fetch (msg_);
- }
- else {
- while (!fetched) {
- app_thread->process_commands (true, false);
- ticks = 0;
- fetched = fetch (msg_);
- }
+ if (flags_ & ZMQ_NOBLOCK) {
+ if (errno != EAGAIN)
+ return -1;
+ app_thread->process_commands (false, false);
+ ticks = 0;
+ rc = xrecv (msg_, flags_);
+ }
+ else {
+ while (rc != 0) {
+ if (errno != EAGAIN)
+ return -1;
+ app_thread->process_commands (true, false);
+ ticks = 0;
+ rc = xrecv (msg_, flags_);
}
}
+
// Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether
// because there are messages available all the time. If poll occurs,
@@ -428,12 +328,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
ticks = 0;
}
- if (!fetched) {
- errno = EAGAIN;
- return -1;
- }
-
- return 0;
+ return rc;
}
int zmq::socket_base_t::close ()
@@ -443,6 +338,37 @@ int zmq::socket_base_t::close ()
// Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher ();
+
+ shutting_down = true;
+
+ while (true) {
+
+ // On third pass of the loop there should be no more I/O objects
+ // because all connecters and listerners were destroyed during
+ // the first pass and all engines delivered by delayed 'own' commands
+ // are destroyed during the second pass.
+ if (io_objects.empty () && !pending_term_acks)
+ break;
+
+ // Send termination request to all associated I/O objects.
+ for (io_objects_t::iterator it = io_objects.begin ();
+ it != io_objects.end (); it++)
+ send_term (*it);
+
+ // Move the objects to the list of pending term acks.
+ pending_term_acks += io_objects.size ();
+ io_objects.clear ();
+
+ // Process commands till we get all the termination acknowledgements.
+ while (pending_term_acks)
+ app_thread->process_commands (true, false);
+ }
+
+ // Check whether there are no session leaks.
+ sessions_sync.lock ();
+ zmq_assert (sessions.empty ());
+ sessions_sync.unlock ();
+
delete this;
// This function must be called after the socket is completely deallocated
@@ -488,68 +414,36 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second;
}
-void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_)
+void zmq::socket_base_t::kill (reader_t *pipe_)
{
- pipe_->set_endpoint (this);
- in_pipes.push_back (pipe_);
- in_pipes.back ()->set_index (active);
- in_pipes [active]->set_index (in_pipes.size () - 1);
- std::swap (in_pipes.back (), in_pipes [active]);
- active++;
+ xkill (pipe_);
}
-void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_)
+void zmq::socket_base_t::revive (reader_t *pipe_)
{
- pipe_->set_endpoint (this);
- out_pipes.push_back (pipe_);
- pipe_->set_index (out_pipes.size () - 1);
+ xrevive (pipe_);
}
-void zmq::socket_base_t::revive (reader_t *pipe_)
+void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
{
- // Move the pipe to the list of active pipes.
- in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
- in_pipes [index]->set_index (active);
- in_pipes [active]->set_index (index);
- std::swap (in_pipes [index], in_pipes [active]);
- active++;
+ if (inpipe_)
+ inpipe_->set_endpoint (this);
+ if (outpipe_)
+ outpipe_->set_endpoint (this);
+ xattach_pipes (inpipe_, outpipe_);
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
{
- // Remove the pipe from the list of inbound pipes.
- in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
- if (index < active) {
- in_pipes [index]->set_index (active - 1);
- in_pipes [active - 1]->set_index (index);
- std::swap (in_pipes [index], in_pipes [active - 1]);
- active--;
- index = active;
- }
- in_pipes [index]->set_index (in_pipes.size () - 1);
- in_pipes [in_pipes.size () - 1]->set_index (index);
- std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]);
- in_pipes.pop_back ();
+ xdetach_inpipe (pipe_);
+ pipe_->set_endpoint (NULL); // ?
}
void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_)
{
- out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index ();
- out_pipes [index]->set_index (out_pipes.size () - 1);
- out_pipes [out_pipes.size () - 1]->set_index (index);
- std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]);
- out_pipes.pop_back ();
-}
-
-void zmq::socket_base_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::socket_base_t::get_index ()
-{
- zmq_assert (index != -1);
- return index;
+ xdetach_outpipe (pipe_);
+ pipe_->set_endpoint (NULL); // ?
}
void zmq::socket_base_t::process_own (owned_t *object_)
@@ -560,10 +454,7 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_)
{
- zmq_assert (in_pipe_);
- attach_inpipe (in_pipe_);
- zmq_assert (out_pipe_);
- attach_outpipe (out_pipe_);
+ attach_pipes (in_pipe_, out_pipe_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
@@ -593,106 +484,3 @@ void zmq::socket_base_t::process_term_ack ()
pending_term_acks--;
}
-bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
-{
- int pipes_count = out_pipes.size ();
-
- // If there are no pipes available, simply drop the message.
- if (pipes_count == 0) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return true;
- }
-
- // First check whether all pipes are available for writing.
- for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
- it++)
- if (!(*it)->check_write (zmq_msg_size (msg_)))
- return false;
-
- msg_content_t *content = (msg_content_t*) msg_->content;
-
- // For VSMs the copying is straighforward.
- if (content == (msg_content_t*) ZMQ_VSM) {
- for (out_pipes_t::iterator it = out_pipes.begin ();
- it != out_pipes.end (); it++) {
- (*it)->write (msg_);
- if (flush_)
- (*it)->flush ();
- }
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return true;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment i.e. no atomic
- // operations are needed.
- if (pipes_count == 1) {
- (*out_pipes.begin ())->write (msg_);
- if (flush_)
- (*out_pipes.begin ())->flush ();
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return true;
- }
-
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why -1).
- if (msg_->shared)
- content->refcnt.add (pipes_count - 1);
- else {
- content->refcnt.set (pipes_count);
- msg_->shared = true;
- }
-
- // Push the message to all destinations.
- for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
- it++) {
- (*it)->write (msg_);
- if (flush_)
- (*it)->flush ();
- }
-
- // Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
-
- return true;
-}
-
-bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
-{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
- // Round-robin over the pipes to get next message.
- for (int count = active; count != 0; count--) {
-
- bool fetched = in_pipes [current]->read (msg_);
-
- // If there's no message in the pipe, move it to the list of
- // non-active pipes.
- if (!fetched) {
- in_pipes [current]->set_index (active - 1);
- in_pipes [active - 1]->set_index (current);
- std::swap (in_pipes [current], in_pipes [active - 1]);
- active--;
- }
-
- current ++;
- if (current >= active)
- current = 0;
-
- if (fetched)
- return true;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- return false;
-}