summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zmq_init.cpp98
-rw-r--r--src/zmq_init.hpp18
2 files changed, 87 insertions, 29 deletions
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index cf65d69..5ca2367 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -29,6 +29,7 @@
#include "session.hpp"
#include "uuid.hpp"
#include "blob.hpp"
+#include "wire.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
@@ -36,7 +37,6 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
const options_t &options_) :
own_t (io_thread_, options_),
ephemeral_engine (NULL),
- sent (false),
received (false),
socket (socket_),
session (session_),
@@ -45,26 +45,61 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (fd_, options);
alloc_assert (engine);
+
+ // Generate an unique identity.
+ unsigned char identity [uuid_t::uuid_blob_len + 1];
+ identity [0] = 0;
+ memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
+ peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
+
+ // Create a list of props to send.
+
+ zmq_msg_t msg;
+ int rc = zmq_msg_init_size (&msg, 4);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&msg);
+ put_uint16 (data, prop_type);
+ put_uint16 (data + 2, options.type);
+ msg.flags |= ZMQ_MSG_MORE;
+ to_send.push_back (msg);
+
+ if (!options.identity.empty ()) {
+ rc = zmq_msg_init_size (&msg, 2 + options.identity.size ());
+ errno_assert (rc == 0);
+ data = (unsigned char*) zmq_msg_data (&msg);
+ put_uint16 (data, prop_identity);
+ memcpy (data + 2, options.identity.data (), options.identity.size ());
+ msg.flags |= ZMQ_MSG_MORE;
+ to_send.push_back (msg);
+ }
+
+ // Remove the MORE flag from the last prop.
+ to_send.back ().flags &= ~ZMQ_MSG_MORE;
}
zmq::zmq_init_t::~zmq_init_t ()
{
if (engine)
engine->terminate ();
+
+ // If there are unsent props still queued deallocate them.
+ for (to_send_t::iterator it = to_send.begin (); it != to_send.end ();
+ ++it) {
+ int rc = zmq_msg_close (&(*it));
+ errno_assert (rc == 0);
+ }
+ to_send.clear ();
}
bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
{
// If the identity was already sent, do nothing.
- if (sent)
+ if (to_send.empty ())
return false;
- // Send the identity.
- int rc = zmq_msg_init_size (msg_, options.identity.size ());
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (msg_), options.identity.c_str (),
- options.identity.size ());
- sent = true;
+ // Pass next property to the engine.
+ *msg_ = to_send.front ();
+ to_send.erase (to_send.begin ());
// Try finalize initialization.
finalise_initialisation ();
@@ -79,24 +114,35 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
if (received)
return false;
- // Retreieve the remote identity. If it's empty, generate a unique name.
- if (!zmq_msg_size (msg_)) {
- unsigned char identity [uuid_t::uuid_blob_len + 1];
- identity [0] = 0;
- memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
- peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
- }
- else {
- peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
- }
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
+ size_t size = zmq_msg_size (msg_);
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
- received = true;
+ // There should be at least property type in the message.
+ zmq_assert (size >= 2);
+ uint16_t prop = get_uint16 (data);
- // Try finalize initialization.
- finalise_initialisation ();
+ switch (prop) {
+ case prop_type:
+ {
+ zmq_assert (size == 4);
+ // TODO: Check whether the type is OK.
+ // uint16_t type = get_uint16 (data + 2);
+ // ...
+ break;
+ };
+ case prop_identity:
+ {
+ peer_identity.assign (data + 2, size - 2);
+ break;
+ }
+ default:
+ zmq_assert (false);
+ }
+
+ if (!(msg_->flags & ZMQ_MSG_MORE)) {
+ received = true;
+ finalise_initialisation ();
+ }
return true;
}
@@ -142,7 +188,7 @@ void zmq::zmq_init_t::process_unplug ()
void zmq::zmq_init_t::finalise_initialisation ()
{
// Unplug and prepare to dispatch engine.
- if (sent && received) {
+ if (to_send.empty () && received) {
ephemeral_engine = engine;
engine = NULL;
ephemeral_engine->unplug ();
@@ -152,7 +198,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
void zmq::zmq_init_t::dispatch_engine ()
{
- if (sent && received) {
+ if (to_send.empty () && received) {
// Engine must be detached.
zmq_assert (!engine);
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index d90915a..92ab05b 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -21,12 +21,15 @@
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
+#include <vector>
+
+#include "../include/zmq.h"
+
#include "i_inout.hpp"
#include "i_engine.hpp"
#include "own.hpp"
#include "fd.hpp"
#include "stdint.hpp"
-#include "stdint.hpp"
#include "blob.hpp"
namespace zmq
@@ -44,6 +47,13 @@ namespace zmq
private:
+ // Peer property IDs.
+ enum prop_t
+ {
+ prop_type = 1,
+ prop_identity = 2
+ };
+
void finalise_initialisation ();
void dispatch_engine ();
@@ -63,8 +73,10 @@ namespace zmq
// Detached transient engine.
i_engine *ephemeral_engine;
- // True if our own identity was already sent to the peer.
- bool sent;
+ // List of messages to send to the peer during the connection
+ // initiation phase.
+ typedef std::vector < ::zmq_msg_t> to_send_t;
+ to_send_t to_send;
// True if peer's identity was already received.
bool received;