summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-21 14:29:22 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-21 14:29:22 +0200
commit6be4b0143793ab5ceebc5d9d6bbe5c2f1333a0d2 (patch)
treea785065e54317d1d360e2e4b3a4acf1d6e5669f1 /src
parenta801b6d8b37557ccfb53030dca22f89a3f99b59c (diff)
session management implemented
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am10
-rw-r--r--src/i_inout.hpp4
-rw-r--r--src/msg_content.hpp (renamed from src/msg.hpp)9
-rw-r--r--src/object.cpp5
-rw-r--r--src/object.hpp4
-rw-r--r--src/owned.cpp10
-rw-r--r--src/owned.hpp4
-rw-r--r--src/pipe.hpp2
-rw-r--r--src/session.cpp39
-rw-r--r--src/session.hpp15
-rw-r--r--src/socket_base.cpp77
-rw-r--r--src/socket_base.hpp24
-rw-r--r--src/zmq.cpp111
-rw-r--r--src/zmq_connecter.cpp11
-rw-r--r--src/zmq_connecter.hpp10
-rw-r--r--src/zmq_connecter_init.cpp94
-rw-r--r--src/zmq_connecter_init.hpp (renamed from src/zmq_init.hpp)43
-rw-r--r--src/zmq_decoder.hpp2
-rw-r--r--src/zmq_encoder.hpp2
-rw-r--r--src/zmq_init.cpp112
-rw-r--r--src/zmq_listener.cpp5
-rw-r--r--src/zmq_listener_init.cpp96
-rw-r--r--src/zmq_listener_init.hpp71
23 files changed, 499 insertions, 261 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 9d6127c..396e3a3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,7 +23,7 @@ libzmq_la_SOURCES = \
i_poll_events.hpp \
i_signaler.hpp \
kqueue.hpp \
- msg.hpp \
+ msg_content.hpp \
mutex.hpp \
object.hpp \
options.hpp \
@@ -47,11 +47,12 @@ libzmq_la_SOURCES = \
ypollset.hpp \
yqueue.hpp \
zmq_connecter.hpp \
+ zmq_connecter_init.hpp \
zmq_decoder.hpp \
zmq_encoder.hpp \
zmq_engine.hpp \
- zmq_init.hpp \
zmq_listener.hpp \
+ zmq_listener_init.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
@@ -77,11 +78,12 @@ libzmq_la_SOURCES = \
ypollset.cpp \
zmq.cpp \
zmq_connecter.cpp \
+ zmq_connecter_init.cpp \
zmq_decoder.cpp \
zmq_encoder.cpp \
zmq_engine.cpp \
- zmq_init.cpp \
- zmq_listener.cpp
+ zmq_listener.cpp \
+ zmq_listener_init.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
index be2e007..8901c04 100644
--- a/src/i_inout.hpp
+++ b/src/i_inout.hpp
@@ -27,8 +27,8 @@ namespace zmq
struct i_inout
{
- virtual bool read (::zmq_msg *msg_) = 0;
- virtual bool write (::zmq_msg *msg_) = 0;
+ virtual bool read (::zmq_msg_t *msg_) = 0;
+ virtual bool write (::zmq_msg_t *msg_) = 0;
virtual void flush () = 0;
};
diff --git a/src/msg.hpp b/src/msg_content.hpp
index 4f35961..b468746 100644
--- a/src/msg.hpp
+++ b/src/msg_content.hpp
@@ -26,8 +26,8 @@
#include "atomic_counter.hpp"
-//namespace zmq
-//{
+namespace zmq
+{
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
@@ -36,7 +36,8 @@
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
- struct zmq_msg_content
+
+ struct msg_content_t
{
void *data;
size_t size;
@@ -44,6 +45,6 @@
zmq::atomic_counter_t refcnt;
};
-//}
+}
#endif
diff --git a/src/object.cpp b/src/object.cpp
index b1b7c8a..0a25750 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -131,9 +131,8 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
{
- // Let the object know that it cannot shut down till it gets this command.
- destination_->inc_seqnum ();
-
+ // The assumption here is that command sequence number of the destination
+ // object was already incremented in find_session function.
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::attach;
diff --git a/src/object.hpp b/src/object.hpp
index 02a071a..31c8c40 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -30,6 +30,10 @@ namespace zmq
class object_t
{
+ // Repository of sessions needs to use caller's send_* functions
+ // when creating new session. TODO: Get rid of this dependency.
+ friend class socket_base_t;
+
public:
object_t (class dispatcher_t *dispatcher_, int thread_slot_);
diff --git a/src/owned.cpp b/src/owned.cpp
index 6995a39..810bed7 100644
--- a/src/owned.cpp
+++ b/src/owned.cpp
@@ -47,6 +47,14 @@ void zmq::owned_t::process_plug ()
finalise_command ();
}
+void zmq::owned_t::process_attach (zmq_engine_t *engine_)
+{
+ // Keep track of how many commands were processed so far.
+ processed_seqnum++;
+
+ finalise_command ();
+}
+
void zmq::owned_t::term ()
{
send_term_req (owner, this);
@@ -65,8 +73,8 @@ void zmq::owned_t::finalise_command ()
// If termination request was already received and there are no more
// commands to wait for, terminate the object.
if (shutting_down && processed_seqnum == sent_seqnum.get ()) {
- send_term_ack (owner);
process_unplug ();
+ send_term_ack (owner);
delete this;
}
}
diff --git a/src/owned.hpp b/src/owned.hpp
index 22595d1..78036a3 100644
--- a/src/owned.hpp
+++ b/src/owned.hpp
@@ -59,6 +59,10 @@ namespace zmq
// handler.
void process_plug ();
+ // It's vital that session invokes io_object_t::process_attach
+ // at the end of it's own attach handler.
+ void process_attach (class zmq_engine_t *engine_);
+
// io_object_t defines a new handler used to disconnect the object
// from the poller object. Implement the handlen in the derived
// classes to ensure sane cleanup.
diff --git a/src/pipe.hpp b/src/pipe.hpp
index d771120..28e4b4d 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -29,7 +29,7 @@ namespace zmq
{
// Message pipe.
- class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity>
+ class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity>
{
};
diff --git a/src/session.cpp b/src/session.cpp
index 2bb4ff6..fc1f858 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -22,9 +22,10 @@
#include "err.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- zmq_engine_t *engine_) :
+ const char *name_) :
owned_t (parent_, owner_),
- engine (engine_)
+ engine (NULL),
+ name (name_)
{
}
@@ -32,12 +33,12 @@ zmq::session_t::~session_t ()
{
}
-bool zmq::session_t::read (::zmq_msg *msg_)
+bool zmq::session_t::read (::zmq_msg_t *msg_)
{
return false;
}
-bool zmq::session_t::write (::zmq_msg *msg_)
+bool zmq::session_t::write (::zmq_msg_t *msg_)
{
return false;
}
@@ -48,14 +49,34 @@ void zmq::session_t::flush ()
void zmq::session_t::process_plug ()
{
- zmq_assert (engine);
- engine->plug (this);
+ // Register the session with the socket.
+ bool ok = owner->register_session (name.c_str (), this);
+
+ // There's already a session with the specified identity.
+ // We should syslog it and drop the session. TODO
+ zmq_assert (ok);
+
owned_t::process_plug ();
}
void zmq::session_t::process_unplug ()
{
- zmq_assert (engine);
- engine->unplug ();
- delete engine;
+ // Unregister the session from the socket.
+ bool ok = owner->unregister_session (name.c_str ());
+ zmq_assert (ok);
+
+ if (engine) {
+ engine->unplug ();
+ delete engine;
+ engine = NULL;
+ }
+}
+
+void zmq::session_t::process_attach (class zmq_engine_t *engine_)
+{
+ zmq_assert (engine_);
+ engine = engine_;
+ engine->plug (this);
+
+ owned_t::process_attach (engine_);
}
diff --git a/src/session.hpp b/src/session.hpp
index 2cb8e18..6d6bcf7 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,8 +20,11 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
+#include <string>
+
#include "i_inout.hpp"
#include "owned.hpp"
+#include "options.hpp"
namespace zmq
{
@@ -30,24 +33,28 @@ namespace zmq
{
public:
- session_t (object_t *parent_, socket_base_t *owner_,
- class zmq_engine_t *engine_);
+ session_t (object_t *parent_, socket_base_t *owner_, const char *name_);
private:
~session_t ();
// i_inout interface implementation.
- bool read (::zmq_msg *msg_);
- bool write (::zmq_msg *msg_);
+ bool read (::zmq_msg_t *msg_);
+ bool write (::zmq_msg_t *msg_);
void flush ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
+ void process_attach (class zmq_engine_t *engine_);
class zmq_engine_t *engine;
+ // The name of the session. One that is used to register it with
+ // socket-level repository of sessions.
+ std::string name;
+
session_t (const session_t&);
void operator = (const session_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fa6c1e3..fb7bdcf 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -17,6 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string>
#include <algorithm>
#include "../include/zmq.h"
@@ -30,16 +31,20 @@
#include "session.hpp"
#include "config.hpp"
#include "owned.hpp"
+#include "uuid.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
pending_term_acks (0),
- app_thread (parent_)
+ app_thread (parent_),
+ shutting_down (false)
{
}
zmq::socket_base_t::~socket_base_t ()
{
+ shutting_down = true;
+
while (true) {
// On third pass of the loop there should be no more I/O objects
@@ -64,10 +69,12 @@ zmq::socket_base_t::~socket_base_t ()
}
// Check whether there are no session leaks.
+ sessions_sync.lock ();
zmq_assert (sessions.empty ());
+ sessions_sync.unlock ();
}
-int zmq::socket_base_t::setsockopt (int option_, void *optval_,
+int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
switch (option_) {
@@ -113,11 +120,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
return 0;
case ZMQ_IDENTITY:
- if (optvallen_ != sizeof (const char*)) {
- errno = EINVAL;
- return -1;
- }
- options.identity = (const char*) optval_;
+ options.identity.assign ((const char*) optval_, optvallen_);
return 0;
default:
@@ -141,18 +144,34 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
+ // Generate a unique name for the session.
+ std::string session_name ("#");
+ session_name += uuid_t ().to_string ();
+
+ // Create the session.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ session_t *session = new session_t (io_thread, this, session_name.c_str ());
+ zmq_assert (session);
+ send_plug (session);
+ send_own (this, session);
+
+ // Create the connecter object. Supply it with the session name so that
+ // it can bind the new connection to the session once it is established.
zmq_connecter_t *connecter = new zmq_connecter_t (
- choose_io_thread (options.affinity), this, options);
+ choose_io_thread (options.affinity), this, options,
+ session_name.c_str ());
int rc = connecter->set_address (addr_);
- if (rc != 0)
+ if (rc != 0) {
+ delete connecter;
return -1;
-
+ }
send_plug (connecter);
send_own (this, connecter);
+
return 0;
}
-int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_)
+int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
@@ -162,7 +181,7 @@ int zmq::socket_base_t::flush ()
zmq_assert (false);
}
-int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_)
+int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
@@ -174,35 +193,40 @@ int zmq::socket_base_t::close ()
return 0;
}
-void zmq::socket_base_t::register_session (const char *name_,
+bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
sessions_sync.lock ();
- bool inserted = sessions.insert (std::make_pair (name_, session_)).second;
- zmq_assert (inserted);
+ bool registered = sessions.insert (std::make_pair (name_, session_)).second;
sessions_sync.unlock ();
+ return registered;
}
-void zmq::socket_base_t::unregister_session (const char *name_)
+bool zmq::socket_base_t::unregister_session (const char *name_)
{
sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_);
- zmq_assert (it != sessions.end ());
+ bool unregistered = (it != sessions.end ());
sessions.erase (it);
sessions_sync.unlock ();
+ return unregistered;
}
-zmq::session_t *zmq::socket_base_t::get_session (const char *name_)
+zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
{
sessions_sync.lock ();
+
sessions_t::iterator it = sessions.find (name_);
- session_t *session = NULL;
- if (it != sessions.end ()) {
- session = it->second;
- session->inc_seqnum ();
- }
+ if (it == sessions.end ()) {
+ sessions_sync.unlock ();
+ return NULL;
+ }
+
+ // Prepare the session for subsequent attach command.
+ it->second->inc_seqnum ();
+
sessions_sync.unlock ();
- return session;
+ return it->second;
}
void zmq::socket_base_t::process_own (owned_t *object_)
@@ -212,6 +236,11 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_term_req (owned_t *object_)
{
+ // When shutting down we can ignore termination requests from owned
+ // objects. They are going to be terminated anyway.
+ if (shutting_down)
+ return;
+
// If I/O object is well and alive ask it to terminate.
io_objects_t::iterator it = std::find (io_objects.begin (),
io_objects.end (), object_);
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 8e99654..20ac4e2 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -40,23 +40,22 @@ namespace zmq
~socket_base_t ();
// Interface for communication with the API layer.
- virtual int setsockopt (int option_, void *optval_, size_t optvallen_);
+ virtual int setsockopt (int option_, const void *optval_,
+ size_t optvallen_);
virtual int bind (const char *addr_);
virtual int connect (const char *addr_);
- virtual int send (struct zmq_msg *msg_, int flags_);
+ virtual int send (struct zmq_msg_t *msg_, int flags_);
virtual int flush ();
- virtual int recv (struct zmq_msg *msg_, int flags_);
+ virtual int recv (struct zmq_msg_t *msg_, int flags_);
virtual int close ();
- // Functions that owned objects use to manipulate socket's list
- // of existing sessions.
- // Note that this functionality cannot be implemented via inter-thread
+ // The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
// thread to 0MQ.
- void register_session (const char *name_, class session_t *session_);
- void unregister_session (const char *name_);
- class session_t *get_session (const char *name_);
+ bool register_session (const char *name_, class session_t *session_);
+ bool unregister_session (const char *name_);
+ class session_t *find_session (const char *name_);
private:
@@ -80,10 +79,17 @@ namespace zmq
// Socket options.
options_t options;
+ // If true, socket is already shutting down. No new work should be
+ // started.
+ bool shutting_down;
+
// List of existing sessions. This list is never referenced from within
// the socket, instead it is used by I/O objects owned by the session.
// As those objects can live in different threads, the access is
// synchronised using 'sessions_sync' mutex.
+ // Local sessions are those named by the local instance of 0MQ.
+ // Remote sessions are the sessions who's identities are provided by
+ // the remote party.
typedef std::map <std::string, session_t*> sessions_t;
sessions_t sessions;
mutex_t sessions_sync;
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 7ebe003..49096ad 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -26,76 +26,79 @@
#include "socket_base.hpp"
#include "err.hpp"
#include "dispatcher.hpp"
-#include "msg.hpp"
+#include "msg_content.hpp"
-int zmq_msg_init (zmq_msg *msg_)
+int zmq_msg_init (zmq_msg_t *msg_)
{
- msg_->content = (zmq_msg_content*) ZMQ_VSM;
+ msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
msg_->vsm_size = 0;
return 0;
}
-int zmq_msg_init_size (zmq_msg *msg_, size_t size_)
+int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
if (size_ <= ZMQ_MAX_VSM_SIZE) {
- msg_->content = (zmq_msg_content*) ZMQ_VSM;
+ msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
msg_->vsm_size = (uint16_t) size_;
}
else {
- msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content) +
- size_);
+ msg_->content =
+ (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
if (!msg_->content) {
errno = ENOMEM;
return -1;
}
msg_->shared = 0;
-
- msg_->content->data = (void*) (msg_->content + 1);
- msg_->content->size = size_;
- msg_->content->ffn = NULL;
- new (&msg_->content->refcnt) zmq::atomic_counter_t ();
+
+ zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+ content->data = (void*) (content + 1);
+ content->size = size_;
+ content->ffn = NULL;
+ new (&content->refcnt) zmq::atomic_counter_t ();
}
return 0;
}
-int zmq_msg_init_data (zmq_msg *msg_, void *data_, size_t size_,
+int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_)
{
msg_->shared = 0;
- msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content));
+ msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
zmq_assert (msg_->content);
- msg_->content->data = data_;
- msg_->content->size = size_;
- msg_->content->ffn = ffn_;
- new (&msg_->content->refcnt) zmq::atomic_counter_t ();
+ zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+ content->data = data_;
+ content->size = size_;
+ content->ffn = ffn_;
+ new (&content->refcnt) zmq::atomic_counter_t ();
return 0;
}
-int zmq_msg_close (zmq_msg *msg_)
+int zmq_msg_close (zmq_msg_t *msg_)
{
- // For VSMs and delimiters there are no resources to free
- if (msg_->content == (zmq_msg_content*) ZMQ_DELIMITER ||
- msg_->content == (zmq_msg_content*) ZMQ_VSM ||
- msg_->content == (zmq_msg_content*) ZMQ_GAP)
+ // For VSMs and delimiters there are no resources to free.
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
+ msg_->content == (zmq::msg_content_t*) ZMQ_VSM ||
+ msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return 0;
- // If the content is not shared, or if it is shared and the reference
+ // If the content is not shared, or if it is shared and the reference.
// count has dropped to zero, deallocate it.
- if (!msg_->shared || !msg_->content->refcnt.sub (1)) {
+ zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+ if (!msg_->shared || !content->refcnt.sub (1)) {
- // We used "placement new" operator to initialize the reference
+ // We used "placement new" operator to initialize the reference.
// counter so we call its destructor now.
- msg_->content->refcnt.~atomic_counter_t ();
+ content->refcnt.~atomic_counter_t ();
- if (msg_->content->ffn)
- msg_->content->ffn (msg_->content->data);
- free (msg_->content);
+ if (content->ffn)
+ content->ffn (content->data);
+ free (content);
}
return 0;
}
-int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_)
+int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
zmq_msg_close (dest_);
*dest_ = *src_;
@@ -103,23 +106,24 @@ int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_)
return 0;
}
-int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_)
+int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
zmq_msg_close (dest_);
// VSMs and delimiters require no special handling.
if (src_->content !=
- (zmq_msg_content*) ZMQ_DELIMITER &&
- src_->content != (zmq_msg_content*) ZMQ_VSM &&
- src_->content != (zmq_msg_content*) ZMQ_GAP) {
+ (zmq::msg_content_t*) ZMQ_DELIMITER &&
+ src_->content != (zmq::msg_content_t*) ZMQ_VSM &&
+ src_->content != (zmq::msg_content_t*) ZMQ_GAP) {
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
+ zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
if (src_->shared)
- src_->content->refcnt.add (1);
+ content->refcnt.add (1);
else {
src_->shared = true;
- src_->content->refcnt.set (2);
+ content->refcnt.set (2);
}
}
@@ -127,32 +131,34 @@ int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_)
return 0;
}
-void *zmq_msg_data (zmq_msg *msg_)
+void *zmq_msg_data (zmq_msg_t *msg_)
{
- if (msg_->content == (zmq_msg_content*) ZMQ_VSM)
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_data;
if (msg_->content ==
- (zmq_msg_content*) ZMQ_DELIMITER ||
- msg_->content == (zmq_msg_content*) ZMQ_GAP)
+ (zmq::msg_content_t*) ZMQ_DELIMITER ||
+ msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return NULL;
- return msg_->content->data;
+
+ return ((zmq::msg_content_t*) msg_->content)->data;
}
-size_t zmq_msg_size (zmq_msg *msg_)
+size_t zmq_msg_size (zmq_msg_t *msg_)
{
- if (msg_->content == (zmq_msg_content*) ZMQ_VSM)
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_size;
if (msg_->content ==
- (zmq_msg_content*) ZMQ_DELIMITER ||
- msg_->content == (zmq_msg_content*) ZMQ_GAP)
+ (zmq::msg_content_t*) ZMQ_DELIMITER ||
+ msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return 0;
- return msg_->content->size;
+
+ return ((zmq::msg_content_t*) msg_->content)->size;
}
-int zmq_msg_type (zmq_msg *msg_)
+int zmq_msg_type (zmq_msg_t *msg_)
{
// If it's a genuine message, return 0.
- if (msg_->content >= (zmq_msg_content*) ZMQ_VSM)
+ if (msg_->content >= (zmq::msg_content_t*) ZMQ_VSM)
return 0;
// Trick the compiler to believe that content is an integer.
@@ -192,7 +198,8 @@ int zmq_close (void *s_)
return 0;
}
-int zmq_setsockopt (void *s_, int option_, void *optval_, size_t optvallen_)
+int zmq_setsockopt (void *s_, int option_, const void *optval_,
+ size_t optvallen_)
{
return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
optvallen_));
@@ -208,7 +215,7 @@ int zmq_connect (void *s_, const char *addr_)
return (((zmq::socket_base_t*) s_)->connect (addr_));
}
-int zmq_send (void *s_, zmq_msg *msg_, int flags_)
+int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
{
return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
}
@@ -218,7 +225,7 @@ int zmq_flush (void *s_)
return (((zmq::socket_base_t*) s_)->flush ());
}
-int zmq_recv (void *s_, zmq_msg *msg_, int flags_)
+int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
{
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 00c8cb2..e4e7eea 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -18,16 +18,18 @@
*/
#include "zmq_connecter.hpp"
-#include "zmq_init.hpp"
+#include "zmq_connecter_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
- socket_base_t *owner_, const options_t &options_) :
+ socket_base_t *owner_, const options_t &options_,
+ const char *session_name_) :
owned_t (parent_, owner_),
io_object_t (parent_),
handle_valid (false),
- options (options_)
+ options (options_),
+ session_name (session_name_)
{
}
@@ -76,7 +78,8 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, true, options);
+ zmq_connecter_init_t *init = new zmq_connecter_init_t (io_thread, owner,
+ fd, options, session_name.c_str ());
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index dcdec19..e308502 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
+#include <string>
+
#include "owned.hpp"
#include "io_object.hpp"
#include "tcp_connecter.hpp"
@@ -34,15 +36,14 @@ namespace zmq
public:
zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
- const options_t &options_);
+ const options_t &options_, const char *session_name_);
+ ~zmq_connecter_t ();
// Set IP address to connect to.
int set_address (const char *addr_);
private:
- ~zmq_connecter_t ();
-
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
@@ -68,6 +69,9 @@ namespace zmq
// Associated socket options.
options_t options;
+ // Name of the session associated with the connecter.
+ std::string session_name;
+
zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&);
};
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
new file mode 100644
index 0000000..7048bd1
--- /dev/null
+++ b/src/zmq_connecter_init.cpp
@@ -0,0 +1,94 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "zmq_connecter_init.hpp"
+#include "io_thread.hpp"
+#include "session.hpp"
+#include "err.hpp"
+
+zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
+ socket_base_t *owner_, fd_t fd_, const options_t &options_,
+ const char *session_name_) :
+ owned_t (parent_, owner_),
+ options (options_),
+ session_name (session_name_)
+{
+ // Create associated engine object.
+ engine = new zmq_engine_t (parent_, fd_);
+ zmq_assert (engine);
+}
+
+zmq::zmq_connecter_init_t::~zmq_connecter_init_t ()
+{
+ if (engine)
+ delete engine;
+}
+
+bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_)
+{
+ // Send identity.
+ int rc = zmq_msg_init_size (msg_, options.identity.size ());
+ zmq_assert (rc == 0);
+ memcpy (zmq_msg_data (msg_), options.identity.c_str (),
+ options.identity.size ());
+
+ // Initialisation is done at this point. Disconnect the engine from
+ // the init object.
+ engine->unplug ();
+
+ // Find the session associated with this connecter. If it doesn't exist
+ // drop the newly created connection. If it does, attach it to the
+ // connection.
+ session_t *session = owner->find_session (session_name.c_str ());
+ if (!session) {
+ // TODO
+ zmq_assert (false);
+ }
+ send_attach (session, engine);
+ engine = NULL;
+
+ // Destroy the init object.
+ term ();
+
+ return true;
+}
+
+bool zmq::zmq_connecter_init_t::write (::zmq_msg_t *msg_)
+{
+ return false;
+}
+
+void zmq::zmq_connecter_init_t::flush ()
+{
+ // We are not expecting any messages. No point in flushing.
+ zmq_assert (false);
+}
+
+void zmq::zmq_connecter_init_t::process_plug ()
+{
+ zmq_assert (engine);
+ engine->plug (this);
+ owned_t::process_plug ();
+}
+
+void zmq::zmq_connecter_init_t::process_unplug ()
+{
+ if (engine)
+ engine->unplug ();
+}
diff --git a/src/zmq_init.hpp b/src/zmq_connecter_init.hpp
index 5eb289e..79ea9e2 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_connecter_init.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
+#ifndef __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
+#define __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
#include <string>
@@ -33,48 +33,41 @@ namespace zmq
{
// The class handles initialisation phase of native 0MQ wire-level
- // protocol. Currently it can be used to handle both sides of the
- // connection. If it grows to complex, we can separate the two into
- // distinct classes.
+ // protocol on the connecting side of the connection.
- class zmq_init_t : public owned_t, public i_inout
+ class zmq_connecter_init_t : public owned_t, public i_inout
{
public:
- // Set 'connected' to true if the connection was created by 'connect'
- // function. If it was accepted from a listening socket, set it to
- // false.
- zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, fd_t fd_,
- bool connected_, const options_t &options);
- ~zmq_init_t ();
+ zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_,
+ fd_t fd_, const options_t &options, const char *session_name_);
+ ~zmq_connecter_init_t ();
private:
// i_inout interface implementation.
- bool read (::zmq_msg *msg_);
- bool write (::zmq_msg *msg_);
+ bool read (::zmq_msg_t *msg_);
+ bool write (::zmq_msg_t *msg_);
void flush ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
- void create_session ();
-
- // Engine is created by zmq_init_t object. Once the initialisation
- // phase is over it is passed to a session object, possibly running
- // in a different I/O thread.
+ // Engine is created by zmq_connecter_init_t object. Once the
+ // initialisation phase is over it is passed to a session object,
+ // possibly running in a different I/O thread.
zmq_engine_t *engine;
- // If true, we are on the connecting side. If false, we are on the
- // listening side.
- bool connected;
-
// Associated socket options.
options_t options;
- zmq_init_t (const zmq_init_t&);
- void operator = (const zmq_init_t&);
+ // Name of the session to bind new connection to. Makes sense only
+ // when 'connected' is true.
+ std::string session_name;
+
+ zmq_connecter_init_t (const zmq_connecter_init_t&);
+ void operator = (const zmq_connecter_init_t&);
};
}
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index 17c28f8..212e68e 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -45,7 +45,7 @@ namespace zmq
struct i_inout *destination;
unsigned char tmpbuf [8];
- ::zmq_msg in_progress;
+ ::zmq_msg_t in_progress;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
index 89af265..29563fc 100644
--- a/src/zmq_encoder.hpp
+++ b/src/zmq_encoder.hpp
@@ -43,7 +43,7 @@ namespace zmq
bool message_ready ();
struct i_inout *source;
- ::zmq_msg in_progress;
+ ::zmq_msg_t in_progress;
unsigned char tmpbuf [9];
zmq_encoder_t (const zmq_encoder_t&);
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
deleted file mode 100644
index 124622d..0000000
--- a/src/zmq_init.cpp
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "zmq_init.hpp"
-#include "io_thread.hpp"
-#include "session.hpp"
-#include "err.hpp"
-
-zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
- fd_t fd_, bool connected_, const options_t &options_) :
- owned_t (parent_, owner_),
- connected (connected_),
- options (options_)
-{
- // Create associated engine object.
- engine = new zmq_engine_t (parent_, fd_);
- zmq_assert (engine);
-}
-
-zmq::zmq_init_t::~zmq_init_t ()
-{
- if (engine)
- delete engine;
-}
-
-bool zmq::zmq_init_t::read (::zmq_msg *msg_)
-{
- // On the listening side, no initialisation data are sent to the peer.
- if (!connected)
- return false;
-
- // Send identity.
- int rc = zmq_msg_init_size (msg_, options.identity.size ());
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (msg_), options.identity.c_str (),
- options.identity.size ());
-
- // Initialisation is done.
- create_session ();
-
- return true;
-}
-
-bool zmq::zmq_init_t::write (::zmq_msg *msg_)
-{
- // On the connecting side no initialisation data are expected.
- if (connected)
- return false;
-
- // Retreieve the identity.
- options.identity = std::string ((const char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
-
- // Initialisation is done.
- create_session ();
-
- return true;
-}
-
-void zmq::zmq_init_t::flush ()
-{
- // No need to do anything. zmq_init_t does no batching of messages.
- // Each message is processed immediately on write.
-}
-
-void zmq::zmq_init_t::process_plug ()
-{
- zmq_assert (engine);
- engine->plug (this);
- owned_t::process_plug ();
-}
-
-void zmq::zmq_init_t::process_unplug ()
-{
- if (engine)
- engine->unplug ();
-}
-
-void zmq::zmq_init_t::create_session ()
-{
- // Disconnect engine from the init object.
- engine->unplug ();
-
- // Create the session instance.
- io_thread_t *io_thread = choose_io_thread (options.affinity);
- session_t *session = new session_t (io_thread, owner, engine);
- zmq_assert (session);
- engine = NULL;
-
- // Pass session/engine pair to a chosen I/O thread.
- send_plug (session);
- send_own (owner, session);
-
- // Destroy the init object.
- term ();
-}
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 49bbf61..97164c1 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -18,7 +18,7 @@
*/
#include "zmq_listener.hpp"
-#include "zmq_init.hpp"
+#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
@@ -68,7 +68,8 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, false, options);
+ zmq_listener_init_t *init = new zmq_listener_init_t (io_thread, owner,
+ fd, options);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
new file mode 100644
index 0000000..bfd79b4
--- /dev/null
+++ b/src/zmq_listener_init.cpp
@@ -0,0 +1,96 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string>
+
+#include "zmq_listener_init.hpp"
+#include "io_thread.hpp"
+#include "session.hpp"
+#include "err.hpp"
+
+zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
+ socket_base_t *owner_, fd_t fd_, const options_t &options_) :
+ owned_t (parent_, owner_),
+ options (options_)
+{
+ // Create associated engine object.
+ engine = new zmq_engine_t (parent_, fd_);
+ zmq_assert (engine);
+}
+
+zmq::zmq_listener_init_t::~zmq_listener_init_t ()
+{
+ if (engine)
+ delete engine;
+}
+
+bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)
+{
+ return false;
+}
+
+bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
+{
+ // Retreieve the remote identity. We'll use it as a local session name.
+ std::string session_name = std::string ((const char*) zmq_msg_data (msg_),
+ zmq_msg_size (msg_));
+
+ // Initialisation is done. Disconnect the engine from the init object.
+ engine->unplug ();
+
+ // Have a look whether the session already exists. If it does, attach it
+ // to the engine. If it doesn't create it first.
+ session_t *session = owner->find_session (session_name.c_str ());
+ if (!session) {
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ session = new session_t (io_thread, owner, session_name.c_str ());
+ zmq_assert (session);
+ send_plug (session);
+ send_own (owner, session);
+
+ // Reserve a sequence number for following 'attach' command.
+ session->inc_seqnum ();
+ }
+ send_attach (session, engine);
+ engine = NULL;
+
+ // Destroy the init object.
+ term ();
+
+ return true;
+}
+
+void zmq::zmq_listener_init_t::flush ()
+{
+ // No need to do anything. zmq_listener_init_t does no batching
+ // of messages. Each message is processed immediately on write.
+}
+
+void zmq::zmq_listener_init_t::process_plug ()
+{
+ zmq_assert (engine);
+ engine->plug (this);
+ owned_t::process_plug ();
+}
+
+void zmq::zmq_listener_init_t::process_unplug ()
+{
+ if (engine)
+ engine->unplug ();
+}
diff --git a/src/zmq_listener_init.hpp b/src/zmq_listener_init.hpp
new file mode 100644
index 0000000..b061eaa
--- /dev/null
+++ b/src/zmq_listener_init.hpp
@@ -0,0 +1,71 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__
+#define __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__
+
+#include <string>
+
+#include "i_inout.hpp"
+#include "owned.hpp"
+#include "zmq_engine.hpp"
+#include "stdint.hpp"
+#include "fd.hpp"
+#include "options.hpp"
+
+namespace zmq
+{
+
+ // The class handles initialisation phase of native 0MQ wire-level
+ // protocol on the listening side of the connection.
+
+ class zmq_listener_init_t : public owned_t, public i_inout
+ {
+ public:
+
+ zmq_listener_init_t (class io_thread_t *parent_, socket_base_t *owner_,
+ fd_t fd_, const options_t &options);
+ ~zmq_listener_init_t ();
+
+ private:
+
+ // i_inout interface implementation.
+ bool read (::zmq_msg_t *msg_);
+ bool write (::zmq_msg_t *msg_);
+ void flush ();
+
+ // Handlers for incoming commands.
+ void process_plug ();
+ void process_unplug ();
+
+ // Engine is created by zmq_listener_init_t object. Once the
+ // initialisation phase is over it is passed to a session object,
+ // possibly running in a different I/O thread.
+ zmq_engine_t *engine;
+
+ // Associated socket options.
+ options_t options;
+
+ zmq_listener_init_t (const zmq_listener_init_t&);
+ void operator = (const zmq_listener_init_t&);
+ };
+
+}
+
+#endif