summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/dispatcher.cpp8
-rw-r--r--src/fd_signaler.cpp22
-rw-r--r--src/options.cpp4
-rw-r--r--src/options.hpp5
-rw-r--r--src/p2p.cpp14
-rw-r--r--src/p2p.hpp2
-rw-r--r--src/pub.cpp14
-rw-r--r--src/pub.hpp2
-rw-r--r--src/rep.cpp14
-rw-r--r--src/rep.hpp2
-rw-r--r--src/req.cpp14
-rw-r--r--src/req.hpp2
-rw-r--r--src/session.cpp36
-rw-r--r--src/socket_base.cpp40
-rw-r--r--src/socket_base.hpp13
-rw-r--r--src/sub.cpp14
-rw-r--r--src/sub.hpp2
17 files changed, 79 insertions, 129 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 5e7ea46..ef2f7c1 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -88,10 +88,6 @@ int zmq::dispatcher_t::term ()
zmq::dispatcher_t::~dispatcher_t ()
{
- // Close all application theads, sockets, io_objects etc.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- delete app_threads [i];
-
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
@@ -101,6 +97,10 @@ zmq::dispatcher_t::~dispatcher_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
+ // Close all application theads, sockets, io_objects etc.
+ for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
+ delete app_threads [i];
+
// Deallocate all the orphaned pipes.
while (!pipes.empty ())
delete *pipes.begin ();
diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp
index 8c71356..b67b27b 100644
--- a/src/fd_signaler.cpp
+++ b/src/fd_signaler.cpp
@@ -222,22 +222,22 @@ void zmq::fd_signaler_t::signal (int signal_)
uint64_t zmq::fd_signaler_t::poll ()
{
- // If there are signals available, return straight away.
- uint64_t signals = check ();
- if (signals)
- return signals;
+ unsigned char buffer [64];
+ ssize_t nbytes = recv (r, buffer, 64, 0);
+ zmq_assert (nbytes != -1);
- // If there are no signals, wait until at least one signal arrives.
- unsigned char sig;
- ssize_t nbytes = recv (r, &sig, 1, 0);
- errno_assert (nbytes != -1);
- return uint64_t (1) << sig;
+ uint64_t signals = 0;
+ for (int pos = 0; pos != nbytes; pos ++) {
+ zmq_assert (buffer [pos] < 64);
+ signals |= (uint64_t (1) << (buffer [pos]));
+ }
+ return signals;
}
uint64_t zmq::fd_signaler_t::check ()
{
- unsigned char buffer [32];
- ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT);
+ unsigned char buffer [64];
+ ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT);
if (nbytes == -1 && errno == EAGAIN)
return 0;
zmq_assert (nbytes != -1);
diff --git a/src/options.cpp b/src/options.cpp
index b0e6e6e..8d3de45 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -29,7 +29,9 @@ zmq::options_t::options_t () :
affinity (0),
rate (100),
recovery_ivl (10),
- use_multicast_loop (false)
+ use_multicast_loop (false),
+ requires_in (false),
+ requires_out (false)
{
}
diff --git a/src/options.hpp b/src/options.hpp
index cde144c..a52fdeb 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -48,6 +48,11 @@ namespace zmq
// Enable multicast loopback. Default disabled (false).
bool use_multicast_loop;
+
+ // These options are never set by the user directly. Instead they are
+ // provided by the specific socket type.
+ bool requires_in;
+ bool requires_out;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 537f3ce..8ef27a7 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -23,24 +23,16 @@
#include "err.hpp"
zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
- socket_base_t (parent_, ZMQ_P2P)
+ socket_base_t (parent_)
{
+ options.requires_in = true;
+ options.requires_out = true;
}
zmq::p2p_t::~p2p_t ()
{
}
-bool zmq::p2p_t::xrequires_in ()
-{
- return true;
-}
-
-bool zmq::p2p_t::xrequires_out ()
-{
- return true;
-}
-
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 84790a1..a3dae31 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -33,8 +33,6 @@ namespace zmq
~p2p_t ();
// Overloads of functions from socket_base_t.
- bool xrequires_in ();
- bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
diff --git a/src/pub.cpp b/src/pub.cpp
index 020d789..faaa9aa 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -25,8 +25,10 @@
#include "pipe.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
- socket_base_t (parent_, ZMQ_PUB)
+ socket_base_t (parent_)
{
+ options.requires_in = false;
+ options.requires_out = true;
}
zmq::pub_t::~pub_t ()
@@ -36,16 +38,6 @@ zmq::pub_t::~pub_t ()
out_pipes.clear ();
}
-bool zmq::pub_t::xrequires_in ()
-{
- return false;
-}
-
-bool zmq::pub_t::xrequires_out ()
-{
- return true;
-}
-
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
diff --git a/src/pub.hpp b/src/pub.hpp
index 8255c6f..c5eeac1 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -34,8 +34,6 @@ namespace zmq
~pub_t ();
// Overloads of functions from socket_base_t.
- bool xrequires_in ();
- bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
diff --git a/src/rep.cpp b/src/rep.cpp
index 2fbb66c..d586988 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -24,28 +24,20 @@
#include "pipe.hpp"
zmq::rep_t::rep_t (class app_thread_t *parent_) :
- socket_base_t (parent_, ZMQ_REP),
+ socket_base_t (parent_),
active (0),
current (0),
waiting_for_reply (false),
reply_pipe (NULL)
{
+ options.requires_in = true;
+ options.requires_out = true;
}
zmq::rep_t::~rep_t ()
{
}
-bool zmq::rep_t::xrequires_in ()
-{
- return true;
-}
-
-bool zmq::rep_t::xrequires_out ()
-{
- return true;
-}
-
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
diff --git a/src/rep.hpp b/src/rep.hpp
index 6e55f47..07bdaf4 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -34,8 +34,6 @@ namespace zmq
~rep_t ();
// Overloads of functions from socket_base_t.
- bool xrequires_in ();
- bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
diff --git a/src/req.cpp b/src/req.cpp
index 05629df..9b4dffa 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -24,28 +24,20 @@
#include "pipe.hpp"
zmq::req_t::req_t (class app_thread_t *parent_) :
- socket_base_t (parent_, ZMQ_REQ),
+ socket_base_t (parent_),
current (0),
waiting_for_reply (false),
reply_pipe_active (false),
reply_pipe (NULL)
{
+ options.requires_in = true;
+ options.requires_out = true;
}
zmq::req_t::~req_t ()
{
}
-bool zmq::req_t::xrequires_in ()
-{
- return true;
-}
-
-bool zmq::req_t::xrequires_out ()
-{
- return true;
-}
-
void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
diff --git a/src/req.hpp b/src/req.hpp
index 9158fbe..95ddf0d 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -34,8 +34,6 @@ namespace zmq
~req_t ();
// Overloads of functions from socket_base_t.
- bool xrequires_in ();
- bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
diff --git a/src/session.cpp b/src/session.cpp
index b829ae9..eb0a963 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -43,7 +43,7 @@ zmq::session_t::~session_t ()
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
- if (!active)
+ if (!in_pipe || !active)
return false;
return in_pipe->read (msg_);
@@ -51,8 +51,9 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- if (!out_pipe)
- return true;
+ // The communication is unidirectional.
+ // We don't expect any message to arrive.
+ zmq_assert (out_pipe);
if (out_pipe->write (msg_)) {
zmq_msg_init (msg_);
@@ -136,15 +137,26 @@ void zmq::session_t::process_plug ()
// already. Otherwise, it's being created by the listener and the pipes
// are yet to be created.
if (!in_pipe && !out_pipe) {
- pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
- zmq_assert (inbound);
- in_pipe = &inbound->reader;
- in_pipe->set_endpoint (this);
- pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
- zmq_assert (outbound);
- out_pipe = &outbound->writer;
- out_pipe->set_endpoint (this);
- send_bind (owner, this, &outbound->reader, &inbound->writer);
+
+ pipe_t *inbound = NULL;
+ pipe_t *outbound = NULL;
+
+ if (options.requires_out) {
+ inbound = new pipe_t (this, owner, options.hwm, options.lwm);
+ zmq_assert (inbound);
+ in_pipe = &inbound->reader;
+ in_pipe->set_endpoint (this);
+ }
+
+ if (options.requires_in) {
+ outbound = new pipe_t (owner, this, options.hwm, options.lwm);
+ zmq_assert (outbound);
+ out_pipe = &outbound->writer;
+ out_pipe->set_endpoint (this);
+ }
+
+ send_bind (owner, this, outbound ? &outbound->reader : NULL,
+ inbound ? &inbound->writer : NULL);
}
owned_t::process_plug ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index c669e04..0452993 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -38,9 +38,8 @@
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
-zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
+zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
- type (type_),
pending_term_acks (0),
ticks (0),
app_thread (parent_),
@@ -137,14 +136,14 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *out_pipe = NULL;
// Create inbound pipe, if required.
- if (xrequires_in ()) {
+ if (options.requires_in) {
in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
- if (xrequires_out ()) {
+ if (options.requires_out) {
out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
}
@@ -163,8 +162,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp") {
- // Create the connecter object. Supply it with the session name so that
- // it can bind the new connection to the session once it is established.
+ // Create the connecter object. Supply it with the session name
+ // so that it can bind the new connection to the session once
+ // it is established.
zmq_connecter_t *connecter = new zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
session_name.c_str (), false);
@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// If the socket type requires bi-directional communication
// multicast is not an option (it is uni-directional).
- if (xrequires_in () && xrequires_out ()) {
+ if (options.requires_in && options.requires_out) {
errno = EFAULT;
return -1;
}
@@ -194,11 +194,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp")
udp_encapsulation = true;
- switch (type) {
+ if (options.requires_out) {
- // PGM sender.
- case ZMQ_PUB:
- {
+ // PGM sender.
pgm_sender_t *pgm_sender =
new pgm_sender_t (choose_io_thread (options.affinity), options,
session_name.c_str ());
@@ -212,15 +210,10 @@ int zmq::socket_base_t::connect (const char *addr_)
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
send_attach (session, pgm_sender);
-
- pgm_sender = NULL;
-
- break;
}
+ else if (options.requires_in) {
- // PGM receiver.
- case ZMQ_SUB:
- {
+ // PGM receiver.
pgm_receiver_t *pgm_receiver =
new pgm_receiver_t (choose_io_thread (options.affinity), options,
session_name.c_str ());
@@ -234,16 +227,9 @@ int zmq::socket_base_t::connect (const char *addr_)
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
send_attach (session, pgm_receiver);
-
- pgm_receiver = NULL;
-
- break;
- }
-
- default:
- errno = EINVAL;
- return -1;
}
+ else
+ zmq_assert (false);
return 0;
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 120c932..bba27c3 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -40,7 +40,7 @@ namespace zmq
{
public:
- socket_base_t (class app_thread_t *parent_, int type_);
+ socket_base_t (class app_thread_t *parent_);
// Interface for communication with the API layer.
int setsockopt (int option_, const void *optval_,
@@ -73,8 +73,6 @@ namespace zmq
virtual ~socket_base_t ();
// Pipe management is done by individual socket types.
- virtual bool xrequires_in () = 0;
- virtual bool xrequires_out () = 0;
virtual void xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) = 0;
virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
@@ -89,6 +87,9 @@ namespace zmq
virtual int xflush () = 0;
virtual int xrecv (struct zmq_msg_t *msg_, int options_) = 0;
+ // Socket options.
+ options_t options;
+
private:
// Handlers for incoming commands.
@@ -98,9 +99,6 @@ namespace zmq
void process_term_req (class owned_t *object_);
void process_term_ack ();
- // Type of the socket.
- int type;
-
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t;
@@ -116,9 +114,6 @@ namespace zmq
// Application thread the socket lives in.
class app_thread_t *app_thread;
- // Socket options.
- options_t options;
-
// If true, socket is already shutting down. No new work should be
// started.
bool shutting_down;
diff --git a/src/sub.cpp b/src/sub.cpp
index 73510c6..66aa8fd 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -24,11 +24,13 @@
#include "pipe.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) :
- socket_base_t (parent_, ZMQ_SUB),
+ socket_base_t (parent_),
active (0),
current (0),
all_count (0)
{
+ options.requires_in = true;
+ options.requires_out = false;
}
zmq::sub_t::~sub_t ()
@@ -38,16 +40,6 @@ zmq::sub_t::~sub_t ()
in_pipes.clear ();
}
-bool zmq::sub_t::xrequires_in ()
-{
- return true;
-}
-
-bool zmq::sub_t::xrequires_out ()
-{
- return false;
-}
-
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
diff --git a/src/sub.hpp b/src/sub.hpp
index 29da27a..180391d 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -39,8 +39,6 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
- bool xrequires_in ();
- bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);