diff options
author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:25 +0100 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:25 +0100 |
commit | 5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 (patch) | |
tree | df7b144c5325fd8b3c88c49b456fafc24249abe6 /src/socket_base.cpp | |
parent | a15854bd92db69fcd0b4444fe1b8fe3610a7acf6 (diff) |
Imported Upstream version 2.0.9.dfsgupstream/2.0.9.dfsg
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 29 |
1 files changed, 23 insertions, 6 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index eddb297..c933954 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -39,6 +39,7 @@ #include "pgm_sender.hpp" #include "pgm_receiver.hpp" #include "likely.hpp" +#include "uuid.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -194,13 +195,13 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm); + in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm); + out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap); zmq_assert (out_pipe); } @@ -233,14 +234,14 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm); + in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm); + out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap); zmq_assert (out_pipe); } @@ -424,7 +425,14 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) return -1; } ticks = 0; - return xrecv (msg_, flags_); + + rc = xrecv (msg_, flags_); + if (rc == 0) { + rcvmore = msg_->flags & ZMQ_MSG_MORE; + if (rcvmore) + msg_->flags &= ~ZMQ_MSG_MORE; + } + return rc; } // In blocking scenario, commands are processed over and over again until @@ -621,7 +629,16 @@ void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, inpipe_->set_endpoint (this); if (outpipe_) outpipe_->set_endpoint (this); - xattach_pipes (inpipe_, outpipe_, peer_identity_); + + // If the peer haven't specified it's identity, let's generate one. + if (peer_identity_.size ()) { + xattach_pipes (inpipe_, outpipe_, peer_identity_); + } + else { + blob_t identity (1, 0); + identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len); + xattach_pipes (inpipe_, outpipe_, identity); + } } void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) |