summaryrefslogtreecommitdiff
path: root/src/zmq_init.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-04-04 17:53:49 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-04-04 17:53:49 +0200
commit4b52cf949fa02274ec8817f8e704eaa7cc66adbf (patch)
tree4fcdf26a2dcc0551f458975c453fabfd96416cc1 /src/zmq_init.cpp
parent8203c4dbb2e727ce608590e41c9e0d990c015b3e (diff)
TCP and IPC connection initiation allow for multiple properties
So far the only property passed on connection initiation was identity. The mechanism was now made extensible. Additional properties are needed to introduce functionality such as checking the peer's socket type, "subports" etc. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/zmq_init.cpp')
-rw-r--r--src/zmq_init.cpp98
1 files changed, 72 insertions, 26 deletions
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index cf65d69..5ca2367 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -29,6 +29,7 @@
#include "session.hpp"
#include "uuid.hpp"
#include "blob.hpp"
+#include "wire.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
@@ -36,7 +37,6 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
const options_t &options_) :
own_t (io_thread_, options_),
ephemeral_engine (NULL),
- sent (false),
received (false),
socket (socket_),
session (session_),
@@ -45,26 +45,61 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (fd_, options);
alloc_assert (engine);
+
+ // Generate an unique identity.
+ unsigned char identity [uuid_t::uuid_blob_len + 1];
+ identity [0] = 0;
+ memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
+ peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
+
+ // Create a list of props to send.
+
+ zmq_msg_t msg;
+ int rc = zmq_msg_init_size (&msg, 4);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&msg);
+ put_uint16 (data, prop_type);
+ put_uint16 (data + 2, options.type);
+ msg.flags |= ZMQ_MSG_MORE;
+ to_send.push_back (msg);
+
+ if (!options.identity.empty ()) {
+ rc = zmq_msg_init_size (&msg, 2 + options.identity.size ());
+ errno_assert (rc == 0);
+ data = (unsigned char*) zmq_msg_data (&msg);
+ put_uint16 (data, prop_identity);
+ memcpy (data + 2, options.identity.data (), options.identity.size ());
+ msg.flags |= ZMQ_MSG_MORE;
+ to_send.push_back (msg);
+ }
+
+ // Remove the MORE flag from the last prop.
+ to_send.back ().flags &= ~ZMQ_MSG_MORE;
}
zmq::zmq_init_t::~zmq_init_t ()
{
if (engine)
engine->terminate ();
+
+ // If there are unsent props still queued deallocate them.
+ for (to_send_t::iterator it = to_send.begin (); it != to_send.end ();
+ ++it) {
+ int rc = zmq_msg_close (&(*it));
+ errno_assert (rc == 0);
+ }
+ to_send.clear ();
}
bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
{
// If the identity was already sent, do nothing.
- if (sent)
+ if (to_send.empty ())
return false;
- // Send the identity.
- int rc = zmq_msg_init_size (msg_, options.identity.size ());
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (msg_), options.identity.c_str (),
- options.identity.size ());
- sent = true;
+ // Pass next property to the engine.
+ *msg_ = to_send.front ();
+ to_send.erase (to_send.begin ());
// Try finalize initialization.
finalise_initialisation ();
@@ -79,24 +114,35 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
if (received)
return false;
- // Retreieve the remote identity. If it's empty, generate a unique name.
- if (!zmq_msg_size (msg_)) {
- unsigned char identity [uuid_t::uuid_blob_len + 1];
- identity [0] = 0;
- memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
- peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
- }
- else {
- peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
- }
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
+ size_t size = zmq_msg_size (msg_);
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
- received = true;
+ // There should be at least property type in the message.
+ zmq_assert (size >= 2);
+ uint16_t prop = get_uint16 (data);
- // Try finalize initialization.
- finalise_initialisation ();
+ switch (prop) {
+ case prop_type:
+ {
+ zmq_assert (size == 4);
+ // TODO: Check whether the type is OK.
+ // uint16_t type = get_uint16 (data + 2);
+ // ...
+ break;
+ };
+ case prop_identity:
+ {
+ peer_identity.assign (data + 2, size - 2);
+ break;
+ }
+ default:
+ zmq_assert (false);
+ }
+
+ if (!(msg_->flags & ZMQ_MSG_MORE)) {
+ received = true;
+ finalise_initialisation ();
+ }
return true;
}
@@ -142,7 +188,7 @@ void zmq::zmq_init_t::process_unplug ()
void zmq::zmq_init_t::finalise_initialisation ()
{
// Unplug and prepare to dispatch engine.
- if (sent && received) {
+ if (to_send.empty () && received) {
ephemeral_engine = engine;
engine = NULL;
ephemeral_engine->unplug ();
@@ -152,7 +198,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
void zmq::zmq_init_t::dispatch_engine ()
{
- if (sent && received) {
+ if (to_send.empty () && received) {
// Engine must be detached.
zmq_assert (!engine);