diff options
-rw-r--r-- | src/zmq_init.cpp | 98 | ||||
-rw-r--r-- | src/zmq_init.hpp | 18 |
2 files changed, 87 insertions, 29 deletions
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index cf65d69..5ca2367 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -29,6 +29,7 @@ #include "session.hpp" #include "uuid.hpp" #include "blob.hpp" +#include "wire.hpp" #include "err.hpp" zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, @@ -36,7 +37,6 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, const options_t &options_) : own_t (io_thread_, options_), ephemeral_engine (NULL), - sent (false), received (false), socket (socket_), session (session_), @@ -45,26 +45,61 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, // Create the engine object for this connection. engine = new (std::nothrow) zmq_engine_t (fd_, options); alloc_assert (engine); + + // Generate an unique identity. + unsigned char identity [uuid_t::uuid_blob_len + 1]; + identity [0] = 0; + memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len); + peer_identity.assign (identity, uuid_t::uuid_blob_len + 1); + + // Create a list of props to send. + + zmq_msg_t msg; + int rc = zmq_msg_init_size (&msg, 4); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) zmq_msg_data (&msg); + put_uint16 (data, prop_type); + put_uint16 (data + 2, options.type); + msg.flags |= ZMQ_MSG_MORE; + to_send.push_back (msg); + + if (!options.identity.empty ()) { + rc = zmq_msg_init_size (&msg, 2 + options.identity.size ()); + errno_assert (rc == 0); + data = (unsigned char*) zmq_msg_data (&msg); + put_uint16 (data, prop_identity); + memcpy (data + 2, options.identity.data (), options.identity.size ()); + msg.flags |= ZMQ_MSG_MORE; + to_send.push_back (msg); + } + + // Remove the MORE flag from the last prop. + to_send.back ().flags &= ~ZMQ_MSG_MORE; } zmq::zmq_init_t::~zmq_init_t () { if (engine) engine->terminate (); + + // If there are unsent props still queued deallocate them. + for (to_send_t::iterator it = to_send.begin (); it != to_send.end (); + ++it) { + int rc = zmq_msg_close (&(*it)); + errno_assert (rc == 0); + } + to_send.clear (); } bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) { // If the identity was already sent, do nothing. - if (sent) + if (to_send.empty ()) return false; - // Send the identity. - int rc = zmq_msg_init_size (msg_, options.identity.size ()); - zmq_assert (rc == 0); - memcpy (zmq_msg_data (msg_), options.identity.c_str (), - options.identity.size ()); - sent = true; + // Pass next property to the engine. + *msg_ = to_send.front (); + to_send.erase (to_send.begin ()); // Try finalize initialization. finalise_initialisation (); @@ -79,24 +114,35 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) if (received) return false; - // Retreieve the remote identity. If it's empty, generate a unique name. - if (!zmq_msg_size (msg_)) { - unsigned char identity [uuid_t::uuid_blob_len + 1]; - identity [0] = 0; - memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len); - peer_identity.assign (identity, uuid_t::uuid_blob_len + 1); - } - else { - peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); - } - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); + size_t size = zmq_msg_size (msg_); + unsigned char *data = (unsigned char*) zmq_msg_data (msg_); - received = true; + // There should be at least property type in the message. + zmq_assert (size >= 2); + uint16_t prop = get_uint16 (data); - // Try finalize initialization. - finalise_initialisation (); + switch (prop) { + case prop_type: + { + zmq_assert (size == 4); + // TODO: Check whether the type is OK. + // uint16_t type = get_uint16 (data + 2); + // ... + break; + }; + case prop_identity: + { + peer_identity.assign (data + 2, size - 2); + break; + } + default: + zmq_assert (false); + } + + if (!(msg_->flags & ZMQ_MSG_MORE)) { + received = true; + finalise_initialisation (); + } return true; } @@ -142,7 +188,7 @@ void zmq::zmq_init_t::process_unplug () void zmq::zmq_init_t::finalise_initialisation () { // Unplug and prepare to dispatch engine. - if (sent && received) { + if (to_send.empty () && received) { ephemeral_engine = engine; engine = NULL; ephemeral_engine->unplug (); @@ -152,7 +198,7 @@ void zmq::zmq_init_t::finalise_initialisation () void zmq::zmq_init_t::dispatch_engine () { - if (sent && received) { + if (to_send.empty () && received) { // Engine must be detached. zmq_assert (!engine); diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index d90915a..92ab05b 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -21,12 +21,15 @@ #ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ #define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ +#include <vector> + +#include "../include/zmq.h" + #include "i_inout.hpp" #include "i_engine.hpp" #include "own.hpp" #include "fd.hpp" #include "stdint.hpp" -#include "stdint.hpp" #include "blob.hpp" namespace zmq @@ -44,6 +47,13 @@ namespace zmq private: + // Peer property IDs. + enum prop_t + { + prop_type = 1, + prop_identity = 2 + }; + void finalise_initialisation (); void dispatch_engine (); @@ -63,8 +73,10 @@ namespace zmq // Detached transient engine. i_engine *ephemeral_engine; - // True if our own identity was already sent to the peer. - bool sent; + // List of messages to send to the peer during the connection + // initiation phase. + typedef std::vector < ::zmq_msg_t> to_send_t; + to_send_t to_send; // True if peer's identity was already received. bool received; |