summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-11 14:09:56 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commitd13933bc62fce71b5a58118020e0dd3776e79aa9 (patch)
tree6586d5b9cc637dbf8acae4b32d24da9c8e046014 /src/socket_base.cpp
parentee1f1af0091d9bdffa0e5ce1783da925b3cd7e56 (diff)
I/O object hierarchy implemented
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp435
1 files changed, 124 insertions, 311 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 5d3175a..903e781 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -38,9 +38,8 @@
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
-#include "session.hpp"
+#include "connect_session.hpp"
#include "config.hpp"
-#include "owned.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "ctx.hpp"
@@ -109,20 +108,20 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
}
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
- object_t (parent_, slot_),
+ own_t (parent_, slot_),
zombie (false),
last_processing_time (0),
- pending_term_acks (0),
ticks (0),
- rcvmore (false),
- sent_seqnum (0),
- processed_seqnum (0),
- next_ordinal (1)
+ rcvmore (false)
{
}
zmq::socket_base_t::~socket_base_t ()
{
+ // Check whether there are no session leaks.
+ sessions_sync.lock ();
+ zmq_assert (sessions.empty ());
+ sessions_sync.unlock ();
}
zmq::signaler_t *zmq::socket_base_t::get_signaler ()
@@ -139,6 +138,46 @@ void zmq::socket_base_t::stop ()
send_stop ();
}
+int zmq::socket_base_t::check_protocol (const std::string &protocol_)
+{
+ // First check out whether the protcol is something we are aware of.
+ if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
+ protocol_ != "pgm" && protocol_ != "epgm") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+
+ // If 0MQ is not compiled with OpenPGM, pgm and epgm transports
+ // are not avaialble.
+#if !defined ZMQ_HAVE_OPENPGM
+ if (protocol_ == "pgm" || protocol_ == "epgm") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
+
+ // IPC transport is not available on Windows and OpenVMS.
+#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ if (protocol_ != "ipc") {
+ // Unknown protocol.
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
+
+ // Check whether socket type and transport protocol match.
+ // Specifically, multicast protocols can't be combined with
+ // bi-directional messaging patterns (socket types).
+ if ((protocol_ == "pgm" || protocol_ == "epgm") &&
+ options.requires_in && options.requires_out) {
+ errno = ENOCOMPATPROTO;
+ return -1;
+ }
+
+ // Protocol is available.
+ return 0;
+}
+
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
@@ -225,56 +264,48 @@ int zmq::socket_base_t::bind (const char *addr_)
}
// Parse addr_ string.
- std::string addr_type;
- std::string addr_args;
-
- std::string addr (addr_);
- std::string::size_type pos = addr.find ("://");
-
- if (pos == std::string::npos) {
- errno = EINVAL;
- return -1;
+ std::string protocol;
+ std::string address;
+ {
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+ if (pos == std::string::npos) {
+ errno = EINVAL;
+ return -1;
+ }
+ protocol = addr.substr (0, pos);
+ address = addr.substr (pos + 3);
}
- addr_type = addr.substr (0, pos);
- addr_args = addr.substr (pos + 3);
-
- if (addr_type == "inproc")
- return register_endpoint (addr_args.c_str (), this);
-
- if (addr_type == "tcp" || addr_type == "ipc") {
+ int rc = check_protocol (protocol);
+ if (rc != 0)
+ return -1;
-#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
- if (addr_type == "ipc") {
- errno = EPROTONOSUPPORT;
- return -1;
- }
-#endif
+ if (protocol == "inproc")
+ return register_endpoint (address.c_str (), this);
+ if (protocol == "tcp" || protocol == "ipc") {
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
choose_io_thread (options.affinity), this, options);
zmq_assert (listener);
- int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ());
+ int rc = listener->set_address (protocol.c_str(), address.c_str ());
if (rc != 0) {
delete listener;
return -1;
}
+ launch_child (listener);
- send_plug (listener);
- send_own (this, listener);
return 0;
}
-#if defined ZMQ_HAVE_OPENPGM
- if (addr_type == "pgm" || addr_type == "epgm") {
- // In the case of PGM bind behaves the same like connect.
+ if (protocol == "pgm" || protocol == "epgm") {
+
+ // For convenience's sake, bind can be used interchageable with
+ // connect for PGM and EPGM transports.
return connect (addr_);
}
-#endif
- // Unknown protocol.
- errno = EPROTONOSUPPORT;
- return -1;
+ zmq_assert (false);
}
int zmq::socket_base_t::connect (const char *addr_)
@@ -285,28 +316,31 @@ int zmq::socket_base_t::connect (const char *addr_)
}
// Parse addr_ string.
- std::string addr_type;
- std::string addr_args;
-
- std::string addr (addr_);
- std::string::size_type pos = addr.find ("://");
-
- if (pos == std::string::npos) {
- errno = EINVAL;
- return -1;
+ std::string protocol;
+ std::string address;
+ {
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+ if (pos == std::string::npos) {
+ errno = EINVAL;
+ return -1;
+ }
+ protocol = addr.substr (0, pos);
+ address = addr.substr (pos + 3);
}
- addr_type = addr.substr (0, pos);
- addr_args = addr.substr (pos + 3);
+ int rc = check_protocol (protocol);
+ if (rc != 0)
+ return -1;
- if (addr_type == "inproc") {
+ if (protocol == "inproc") {
// TODO: inproc connect is specific with respect to creating pipes
// as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm.
// Find the peer socket.
- socket_base_t *peer = find_endpoint (addr_args.c_str ());
+ socket_base_t *peer = find_endpoint (address.c_str ());
if (!peer)
return -1;
@@ -329,18 +363,18 @@ int zmq::socket_base_t::connect (const char *addr_)
attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
// Attach the pipes to the peer socket. Note that peer's seqnum
- // was incremented in find_endpoint function. The callee is notified
- // about the fact via the last parameter.
+ // was incremented in find_endpoint function. We don't need it
+ // increased here.
send_bind (peer, outpipe_reader, inpipe_writer,
options.identity, false);
return 0;
}
- // Create unnamed session.
- io_thread_t *io_thread = choose_io_thread (options.affinity);
- session_t *session = new (std::nothrow) session_t (io_thread,
- this, options);
+ // Create session.
+ connect_session_t *session = new (std::nothrow) connect_session_t (
+ choose_io_thread (options.affinity), this, options,
+ protocol.c_str (), address.c_str ());
zmq_assert (session);
// If 'immediate connect' feature is required, we'll create the pipes
@@ -370,95 +404,10 @@ int zmq::socket_base_t::connect (const char *addr_)
session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
}
- // Activate the session.
- send_plug (session);
- send_own (this, session);
-
- if (addr_type == "tcp" || addr_type == "ipc") {
-
-#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
- // Windows named pipes are not compatible with Winsock API.
- // There's no UNIX domain socket implementation on OpenVMS.
- if (addr_type == "ipc") {
- errno = EPROTONOSUPPORT;
- return -1;
- }
-#endif
-
- // Create the connecter object. Supply it with the session name
- // so that it can bind the new connection to the session once
- // it is established.
- zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
- choose_io_thread (options.affinity), this, options,
- session->get_ordinal (), false);
- zmq_assert (connecter);
- int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ());
- if (rc != 0) {
- delete connecter;
- return -1;
- }
- send_plug (connecter);
- send_own (this, connecter);
-
- return 0;
- }
-
-#if defined ZMQ_HAVE_OPENPGM
- if (addr_type == "pgm" || addr_type == "epgm") {
-
- // If the socket type requires bi-directional communication
- // multicast is not an option (it is uni-directional).
- if (options.requires_in && options.requires_out) {
- errno = ENOCOMPATPROTO;
- return -1;
- }
-
- // For epgm, pgm transport with UDP encapsulation is used.
- bool udp_encapsulation = (addr_type == "epgm");
-
- // At this point we'll create message pipes to the session straight
- // away. There's no point in delaying it as no concept of 'connect'
- // exists with PGM anyway.
- if (options.requires_out) {
-
- // PGM sender.
- pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
- choose_io_thread (options.affinity), options);
- zmq_assert (pgm_sender);
+ // Activate the session. Make it a child of this socket.
+ launch_child (session);
- int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
- if (rc != 0) {
- delete pgm_sender;
- return -1;
- }
-
- send_attach (session, pgm_sender, blob_t ());
- }
- else if (options.requires_in) {
-
- // PGM receiver.
- pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
- choose_io_thread (options.affinity), options);
- zmq_assert (pgm_receiver);
-
- int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
- if (rc != 0) {
- delete pgm_receiver;
- return -1;
- }
-
- send_attach (session, pgm_receiver, blob_t ());
- }
- else
- zmq_assert (false);
-
- return 0;
- }
-#endif
-
- // Unknown protoco.
- errno = EPROTONOSUPPORT;
- return -1;
+ return 0;
}
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
@@ -587,72 +536,23 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
+ zmq_assert (!zombie);
+
// Socket becomes a zombie. From now on all new arrived pipes (bind
- // command) and I/O objects (own command) are immediately terminated.
- // Also, any further requests form I/O object termination are ignored
- // (we are going to shut them down anyway -- this way we assure that
- // we do so once only).
+ // command) are immediately terminated.
zombie = true;
- // Unregister all inproc endpoints associated with this socket.
- // Doing this we make sure that no new pipes from other sockets (inproc)
- // will be initiated. However, there may be some inproc pipes already
- // on the fly, but not yet received by this socket. To get finished
- // with them we'll do the subsequent waiting from on-the-fly commands.
- // This should happen very quickly. There's no way to block here for
- // extensive period of time.
- unregister_endpoints (this);
- while (processed_seqnum != sent_seqnum.get ())
- process_commands (true, false);
- // TODO: My feeling is that the above has to be done in the dezombification
- // loop, otherwise we may end up with number of i/o object dropping to zero
- // even though there are more i/o objects on the way.
-
- // The above process ensures that only pipes that will arrive from now on
- // are those initiated by sessions. These in turn have a nice property of
- // not arriving totally asynchronously. When a session -- being an I/O
- // object -- acknowledges its termination we are 100% sure that we'll get
- // no new pipe from it.
-
- // Start termination of all the pipes presently associated with the socket.
- xterm_pipes ();
-
- // Send termination request to all associated I/O objects.
- // Start waiting for the acks. Note that the actual waiting is not done
- // in this function. Rather it is done in delayed manner as socket is
- // being dezombified. The reason is that I/O object shutdown can take
- // considerable amount of time in case there's still a lot of data to
- // push to the network.
- for (io_objects_t::iterator it = io_objects.begin ();
- it != io_objects.end (); it++)
- send_term (*it);
- pending_term_acks += io_objects.size ();
- io_objects.clear ();
-
- // Note that new I/O objects may arrive even in zombie state (say new
- // session initiated by a listener object), however, in such case number
- // of pending acks never drops to zero. Here's the scenario: We have an
- // pending ack for the listener object. Then 'own' commands arrives from
- // the listener notifying the socket about new session. It immediately
- // triggers termination request and number of of pending acks if
- // incremented. Then term_acks arrives from the listener. Number of pending
- // acks is decremented. Later on, the session itself will ack its
- // termination. During the process, number of pending acks never dropped
- // to zero and thus the socket remains safely in the zombie state.
-
- // Transfer the ownership of the socket from this application thread
+ // Start termination of associated I/O object hierarchy.
+ terminate ();
+
+ // Ask context to zombify this socket. In other words, transfer
+ // the ownership of the socket from this application thread
// to the context which will take care of the rest of shutdown process.
- zombify (this);
+ zombify_socket (this);
return 0;
}
-void zmq::socket_base_t::inc_seqnum ()
-{
- // Be aware: This function may be called from a different thread!
- sent_seqnum.add (1);
-}
-
bool zmq::socket_base_t::has_in ()
{
return xhas_in ();
@@ -667,7 +567,7 @@ bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
session_t *session_)
{
sessions_sync.lock ();
- bool registered = named_sessions.insert (
+ bool registered = sessions.insert (
std::make_pair (peer_identity_, session_)).second;
sessions_sync.unlock ();
return registered;
@@ -676,17 +576,17 @@ bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (peer_identity_);
- zmq_assert (it != named_sessions.end ());
- named_sessions.erase (it);
+ sessions_t::iterator it = sessions.find (peer_identity_);
+ zmq_assert (it != sessions.end ());
+ sessions.erase (it);
sessions_sync.unlock ();
}
zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (peer_identity_);
- if (it == named_sessions.end ()) {
+ sessions_t::iterator it = sessions.find (peer_identity_);
+ if (it == sessions.end ()) {
sessions_sync.unlock ();
return NULL;
}
@@ -699,74 +599,16 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
return session;
}
-uint64_t zmq::socket_base_t::register_session (session_t *session_)
-{
- sessions_sync.lock ();
- uint64_t ordinal = next_ordinal;
- next_ordinal++;
- unnamed_sessions.insert (std::make_pair (ordinal, session_));
- sessions_sync.unlock ();
- return ordinal;
-}
-
-void zmq::socket_base_t::unregister_session (uint64_t ordinal_)
-{
- sessions_sync.lock ();
- unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
- zmq_assert (it != unnamed_sessions.end ());
- unnamed_sessions.erase (it);
- sessions_sync.unlock ();
-}
-
-zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_)
-{
- sessions_sync.lock ();
-
- unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
- if (it == unnamed_sessions.end ()) {
- sessions_sync.unlock ();
- return NULL;
- }
- session_t *session = it->second;
-
- // Prepare the session for subsequent attach command.
- session->inc_seqnum ();
-
- sessions_sync.unlock ();
- return session;
-}
-
bool zmq::socket_base_t::dezombify ()
{
zmq_assert (zombie);
// Process any commands from other threads/sockets that may be available
- // at the moment.
+ // at the moment. Ultimately, socket will be destroyed.
process_commands (false, false);
- // If there are no more pipes attached and there are no more I/O objects
- // owned by the socket, we can kill the zombie.
- if (!pending_term_acks && !xhas_pipes ()) {
-
- // If all objects have acknowledged their termination there should
- // definitely be no I/O object remaining in the list.
- zmq_assert (io_objects.empty ());
-
- // Check whether there are no session leaks.
- sessions_sync.lock ();
- zmq_assert (named_sessions.empty ());
- zmq_assert (unnamed_sessions.empty ());
- sessions_sync.unlock ();
-
- // Deallocate all the resources tied to this socket.
- delete this;
-
- // Notify the caller about the fact that the zombie is finally dead.
- return true;
- }
-
- // The zombie remains undead.
- return false;
+// TODO: ???
+ return true;
}
void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
@@ -828,19 +670,6 @@ void zmq::socket_base_t::process_stop ()
zombie = true;
}
-void zmq::socket_base_t::process_own (owned_t *object_)
-{
- // If the socket is already being shut down, new owned objects are
- // immediately asked to terminate.
- if (zombie) {
- send_term (object_);
- pending_term_acks++;
- return;
- }
-
- io_objects.insert (object_);
-}
-
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
const blob_t &peer_identity_)
{
@@ -857,37 +686,21 @@ void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
attach_pipes (in_pipe_, out_pipe_, peer_identity_);
}
-void zmq::socket_base_t::process_term_req (owned_t *object_)
+void zmq::socket_base_t::process_unplug ()
{
- // When shutting down we can ignore termination requests from owned
- // objects. It means the termination request was already sent to
- // the object.
- if (zombie)
- return;
-
- // If I/O object is well and alive ask it to terminate.
- io_objects_t::iterator it = std::find (io_objects.begin (),
- io_objects.end (), object_);
-
- // If not found, we assume that termination request was already sent to
- // the object so we can safely ignore the request.
- if (it == io_objects.end ())
- return;
-
- pending_term_acks++;
- io_objects.erase (it);
- send_term (object_);
}
-void zmq::socket_base_t::process_term_ack ()
+void zmq::socket_base_t::process_term ()
{
- zmq_assert (pending_term_acks);
- pending_term_acks--;
-}
+ zmq_assert (zombie);
-void zmq::socket_base_t::process_seqnum ()
-{
- processed_seqnum++;
+ // Unregister all inproc endpoints associated with this socket.
+ // Doing this we make sure that no new pipes from other sockets (inproc)
+ // will be initiated.
+ unregister_endpoints (this);
+
+ // Continue the termination process immediately.
+ own_t::process_term ();
}
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,