summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_setsockopt.txt10
-rw-r--r--src/options.cpp9
-rw-r--r--src/session.cpp6
-rw-r--r--src/uuid.hpp6
-rw-r--r--src/zmq_encoder.cpp1
-rw-r--r--src/zmq_engine.cpp4
-rw-r--r--src/zmq_init.cpp40
7 files changed, 44 insertions, 32 deletions
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 549a2de..629bffc 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -60,9 +60,11 @@ If the socket has no identity, each run of the application is completely
separated from other runs. However, with identity application reconnects to
existing infrastructure left by the previous run. Thus it may receive
messages that were sent in the meantime, it shares pipe limits with the
-previous run etc.
+previous run etc. Identity should be at least one byte and at most 255 bytes
+long. Identities starting with binary zero are reserver for use by 0MQ
+infrastructure.
+
-Type: string Unit: N/A Default: NULL
+Type: BLOB Unit: N/A Default: NULL
*ZMQ_SUBSCRIBE*::
Applicable only to ZMQ_SUB socket type. It establishes new message filter.
@@ -72,7 +74,7 @@ beginning with specific prefix (e.g. "animals.mammals.dogs."). Multiple
filters can be attached to a single 'sub' socket. In that case message passes
if it matches at least one of the filters.
+
-Type: string Unit: N/A Default: N/A
+Type: BLOB Unit: N/A Default: N/A
*ZMQ_UNSUBSCRIBE*::
Applicable only to ZMQ_SUB socket type. Removes existing message filter.
@@ -81,7 +83,7 @@ exactly. If there were several instances of the same filter created,
this options removes only one of them, leaving the rest in place
and functional.
+
-Type: string Unit: N/A Default: N/A
+Type: BLOB Unit: N/A Default: N/A
*ZMQ_RATE*::
This option applies only to sending side of multicast transports (pgm & udp).
diff --git a/src/options.cpp b/src/options.cpp
index b77af24..f78d8de 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -77,6 +77,15 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_IDENTITY:
+
+ // Empty identity is invalid as well as identity longer than
+ // 255 bytes. Identity starting with binary zero is invalid
+ // as these are used for auto-generated identities.
+ if (optvallen_ < 1 || optvallen_ > 255 ||
+ *((const unsigned char*) optval_) == 0) {
+ errno = EINVAL;
+ return -1;
+ }
identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;
diff --git a/src/session.cpp b/src/session.cpp
index 74bd8ae..05f319c 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -50,7 +50,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
peer_identity (peer_identity_),
options (options_)
{
- if (!peer_identity.empty ()) {
+ if (!peer_identity.empty () && peer_identity [0] != 0) {
if (!owner->register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
@@ -103,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
- if (!ordinal && peer_identity.empty ())
+ if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term ();
}
@@ -173,7 +173,7 @@ void zmq::session_t::process_unplug ()
// Unregister the session from the socket.
if (ordinal)
owner->unregister_session (ordinal);
- else if (!peer_identity.empty ())
+ else if (!peer_identity.empty () && peer_identity [0] != 0)
owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
diff --git a/src/uuid.hpp b/src/uuid.hpp
index 001ea94..f565f8d 100644
--- a/src/uuid.hpp
+++ b/src/uuid.hpp
@@ -44,6 +44,9 @@ namespace zmq
uuid_t ();
~uuid_t ();
+ // The length of textual representation of UUID.
+ enum { uuid_string_len = 36 };
+
// Returns a pointer to buffer containing the textual
// representation of the UUID. The caller is reponsible to
// free the allocated memory.
@@ -51,9 +54,6 @@ namespace zmq
private:
- // The length of textual representation of UUID.
- enum { uuid_string_len = 36 };
-
#if defined ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_MINGW32
typedef unsigned char* RPC_CSTR;
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 5fca182..68626fa 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -89,7 +89,6 @@ bool zmq::zmq_encoder_t::message_ready ()
size -= prefix_size;
}
-
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
// message size.
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 152daf6..623ca63 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include <new>
#include "zmq_engine.hpp"
@@ -161,7 +163,7 @@ void zmq::zmq_engine_t::revive ()
}
void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
-{
+{
decoder.add_prefix (identity_);
}
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 7c5588f..3e76cb9 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -17,10 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "zmq_init.hpp"
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "session.hpp"
+#include "uuid.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
@@ -71,10 +74,19 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
if (received)
return false;
- // Retreieve the remote identity.
- peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
- engine->add_prefix (peer_identity);
+ // Retreieve the remote identity. If it's empty, generate a unique name.
+ if (!zmq_msg_size (msg_)) {
+ unsigned char identity [uuid_t::uuid_string_len + 1];
+ identity [0] = 0;
+ memcpy (identity + 1, uuid_t ().to_string (), uuid_t::uuid_string_len);
+ peer_identity.assign (identity, uuid_t::uuid_string_len + 1);
+ }
+ else {
+ peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
+ zmq_msg_size (msg_));
+ }
+ if (options.traceroute)
+ engine->add_prefix (peer_identity);
received = true;
return true;
@@ -155,10 +167,11 @@ void zmq::zmq_init_t::finalise ()
return;
}
}
+ else {
- // If the peer has a unique name, find the associated session. If it
- // doesn't exist, create it.
- else if (!peer_identity.empty ()) {
+ // If the peer has a unique name, find the associated session.
+ // If it does not exist, create it.
+ zmq_assert (!peer_identity.empty ());
session = owner->find_session (peer_identity);
if (!session) {
session = new (std::nothrow) session_t (
@@ -173,19 +186,6 @@ void zmq::zmq_init_t::finalise ()
}
}
- // If the other party has no specific identity, let's create a
- // transient session.
- else {
- session = new (std::nothrow) session_t (
- choose_io_thread (options.affinity), owner, options, blob_t ());
- zmq_assert (session);
- send_plug (session);
- send_own (owner, session);
-
- // Reserve a sequence number for following 'attach' command.
- session->inc_seqnum ();
- }
-
// No need to increment seqnum as it was already incremented above.
send_attach (session, engine, peer_identity, false);