summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp29
1 files changed, 23 insertions, 6 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index eddb297..c933954 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -39,6 +39,7 @@
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "likely.hpp"
+#include "uuid.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
@@ -194,13 +195,13 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm);
+ in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm);
+ out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap);
zmq_assert (out_pipe);
}
@@ -233,14 +234,14 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm);
+ in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm);
+ out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap);
zmq_assert (out_pipe);
}
@@ -424,7 +425,14 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
return -1;
}
ticks = 0;
- return xrecv (msg_, flags_);
+
+ rc = xrecv (msg_, flags_);
+ if (rc == 0) {
+ rcvmore = msg_->flags & ZMQ_MSG_MORE;
+ if (rcvmore)
+ msg_->flags &= ~ZMQ_MSG_MORE;
+ }
+ return rc;
}
// In blocking scenario, commands are processed over and over again until
@@ -621,7 +629,16 @@ void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
inpipe_->set_endpoint (this);
if (outpipe_)
outpipe_->set_endpoint (this);
- xattach_pipes (inpipe_, outpipe_, peer_identity_);
+
+ // If the peer haven't specified it's identity, let's generate one.
+ if (peer_identity_.size ()) {
+ xattach_pipes (inpipe_, outpipe_, peer_identity_);
+ }
+ else {
+ blob_t identity (1, 0);
+ identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len);
+ xattach_pipes (inpipe_, outpipe_, identity);
+ }
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)