summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_getsockopt.txt16
-rw-r--r--doc/zmq_setsockopt.txt16
-rw-r--r--include/zmq.h1
-rw-r--r--src/command.hpp1
-rw-r--r--src/object.cpp7
-rw-r--r--src/object.hpp4
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp3
-rw-r--r--src/own.cpp20
-rw-r--r--src/own.hpp8
-rw-r--r--src/pair.cpp4
-rw-r--r--src/pair.hpp2
-rw-r--r--src/pub.cpp4
-rw-r--r--src/pub.hpp2
-rw-r--r--src/pull.cpp4
-rw-r--r--src/pull.hpp2
-rw-r--r--src/push.cpp4
-rw-r--r--src/push.hpp2
-rw-r--r--src/session.cpp40
-rw-r--r--src/session.hpp17
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/socket_base.hpp6
-rw-r--r--src/sub.cpp4
-rw-r--r--src/sub.hpp2
-rw-r--r--src/xrep.cpp4
-rw-r--r--src/xrep.hpp2
-rw-r--r--src/xreq.cpp4
-rw-r--r--src/xreq.hpp2
-rw-r--r--src/zmq_connecter.cpp5
-rw-r--r--src/zmq_connecter.hpp4
-rw-r--r--src/zmq_init.cpp3
-rw-r--r--src/zmq_init.hpp4
-rw-r--r--src/zmq_listener.cpp7
-rw-r--r--src/zmq_listener.hpp6
34 files changed, 158 insertions, 74 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 5fee978..54d6b58 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -212,6 +212,22 @@ Default value:: 0
Applicable socket types:: all
+ZMQ_LINGER: Set linger period for socket shutdown
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_LINGER' option shall retrieve the period for pending outbound
+messages to linger in memory after closing the socket. Value of -1 means
+infinite. Pending messages will be kept until they are fully transferred to
+the peer. Value of 0 means that all the pending messages are dropped immediately
+when socket is closed. Positive value means number of milliseconds to keep
+trying to send the pending messages before discarding them.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1
+Applicable socket types:: all
+
+
ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve file descriptor associated with the 0MQ
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index a60fc26..5cbdaf6 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -216,6 +216,22 @@ Default value:: 0
Applicable socket types:: all
+ZMQ_LINGER: Set linger period for socket shutdown
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_LINGER' option shall be set to specify period for pending outbound
+messages to linger in memory after closing the socket. Value of -1 means
+infinite. Pending messages will be kept until they are fully transferred to
+the peer. Value of 0 means that all the pending messages are dropped immediately
+when socket is closed. Positive value means number of milliseconds to keep
+trying to send the pending messages before discarding them.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1
+Applicable socket types:: all
+
+
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
diff --git a/include/zmq.h b/include/zmq.h
index 9684cd5..b565c50 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -191,6 +191,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_FD 14
#define ZMQ_EVENTS 15
#define ZMQ_TYPE 16
+#define ZMQ_LINGER 17
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
diff --git a/src/command.hpp b/src/command.hpp
index c64ca92..0c094b9 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -109,6 +109,7 @@ namespace zmq
// Sent by socket to I/O object to start its shutdown.
struct {
+ int linger;
} term;
// Sent by I/O object to the socket to acknowledge it has
diff --git a/src/object.cpp b/src/object.cpp
index 90c015a..fc08785 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -106,7 +106,7 @@ void zmq::object_t::process_command (command_t &cmd_)
break;
case command_t::term:
- process_term ();
+ process_term (cmd_.args.term.linger);
break;
case command_t::term_ack:
@@ -312,7 +312,7 @@ void zmq::object_t::send_term_req (own_t *destination_,
send_command (cmd);
}
-void zmq::object_t::send_term (own_t *destination_)
+void zmq::object_t::send_term (own_t *destination_, int linger_)
{
command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY
@@ -320,6 +320,7 @@ void zmq::object_t::send_term (own_t *destination_)
#endif
cmd.destination = destination_;
cmd.type = command_t::term;
+ cmd.args.term.linger = linger_;
send_command (cmd);
}
@@ -386,7 +387,7 @@ void zmq::object_t::process_term_req (own_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_term ()
+void zmq::object_t::process_term (int linger_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index bc1b325..9580556 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -80,7 +80,7 @@ namespace zmq
void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class own_t *destination_,
class own_t *object_);
- void send_term (class own_t *destination_);
+ void send_term (class own_t *destination_, int linger_);
void send_term_ack (class own_t *destination_);
// These handlers can be overloaded by the derived objects. They are
@@ -97,7 +97,7 @@ namespace zmq
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
virtual void process_term_req (class own_t *object_);
- virtual void process_term ();
+ virtual void process_term (int linger_);
virtual void process_term_ack ();
// Special handler called after a command that requires a seqnum
diff --git a/src/options.cpp b/src/options.cpp
index e644a9b..c9e330f 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -34,6 +34,7 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
type (-1),
+ linger (-1),
requires_in (false),
requires_out (false),
immediate_connect (true)
@@ -128,6 +129,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
rcvbuf = *((uint64_t*) optval_);
return 0;
+
+ case ZMQ_LINGER:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ linger = *((int*) optval_);
+ return 0;
}
errno = EINVAL;
@@ -138,6 +147,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
{
switch (option_) {
+ case ZMQ_LINGER:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = linger;
+ *optvallen_ = sizeof (int);
+ return 0;
+
case ZMQ_TYPE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index ea0c841..9b57ab6 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -54,6 +54,9 @@ namespace zmq
// Socket type.
int type;
+ // Linger time, in milliseconds.
+ int linger;
+
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
diff --git a/src/own.cpp b/src/own.cpp
index 12f50bf..c91650d 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -31,8 +31,9 @@ zmq::own_t::own_t (class ctx_t *parent_, uint32_t slot_) :
{
}
-zmq::own_t::own_t (io_thread_t *io_thread_) :
+zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
object_t (io_thread_),
+ options (options_),
terminating (false),
sent_seqnum (0),
processed_seqnum (0),
@@ -113,16 +114,19 @@ void zmq::own_t::process_term_req (own_t *object_)
owned.erase (it);
register_term_acks (1);
- send_term (object_);
+
+ // Note that this object is the root of the (partial shutdown) thus, its
+ // value of linger is used, rather than the value stored by the children.
+ send_term (object_, options.linger);
}
void zmq::own_t::process_own (own_t *object_)
{
// If the object is already being shut down, new owned objects are
- // immediately asked to terminate.
+ // immediately asked to terminate. Note that linger is set to zero.
if (terminating) {
register_term_acks (1);
- send_term (object_);
+ send_term (object_, 0);
return;
}
@@ -140,7 +144,7 @@ void zmq::own_t::terminate ()
// As for the root of the ownership tree, there's noone to terminate it,
// so it has to terminate itself.
if (!owner) {
- process_term ();
+ process_term (options.linger);
return;
}
@@ -148,14 +152,14 @@ void zmq::own_t::terminate ()
send_term_req (owner, this);
}
-void zmq::own_t::process_term ()
+void zmq::own_t::process_term (int linger_)
{
// Double termination should never happen.
zmq_assert (!terminating);
- // Send termination request to all owned objects.
+ // Send termination request to all owned objects.
for (owned_t::iterator it = owned.begin (); it != owned.end (); it++)
- send_term (*it);
+ send_term (*it, linger_);
register_term_acks (owned.size ());
owned.clear ();
diff --git a/src/own.hpp b/src/own.hpp
index 6b6f7bf..18f7251 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -24,6 +24,7 @@
#include <algorithm>
#include "object.hpp"
+#include "options.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
@@ -45,7 +46,7 @@ namespace zmq
own_t (class ctx_t *parent_, uint32_t slot_);
// The object is living within I/O thread.
- own_t (class io_thread_t *io_thread_);
+ own_t (class io_thread_t *io_thread_, const options_t &options_);
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
@@ -83,12 +84,15 @@ namespace zmq
// Term handler is protocted rather than private so that it can
// be intercepted by the derived class. This is useful to add custom
// steps to the beginning of the termination process.
- void process_term ();
+ void process_term (int linger_);
// A place to hook in when phyicallal destruction of the object
// is to be delayed.
virtual void process_destroy ();
+ // Socket options associated with this object.
+ options_t options;
+
private:
// Set owner of the object
diff --git a/src/pair.cpp b/src/pair.cpp
index 492ec55..8ee29cf 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -86,7 +86,7 @@ void zmq::pair_t::delimited (reader_t *pipe_)
{
}
-void zmq::pair_t::process_term ()
+void zmq::pair_t::process_term (int linger_)
{
terminating = true;
@@ -100,7 +100,7 @@ void zmq::pair_t::process_term ()
outpipe->terminate ();
}
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
void zmq::pair_t::activated (class reader_t *pipe_)
diff --git a/src/pair.hpp b/src/pair.hpp
index 030fb97..a14544f 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -56,7 +56,7 @@ namespace zmq
private:
// Hook into termination process.
- void process_term ();
+ void process_term (int linger_);
class reader_t *inpipe;
class writer_t *outpipe;
diff --git a/src/pub.cpp b/src/pub.cpp
index b1a1239..6290e9a 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -56,7 +56,7 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
}
}
-void zmq::pub_t::process_term ()
+void zmq::pub_t::process_term (int linger_)
{
terminating = true;
@@ -68,7 +68,7 @@ void zmq::pub_t::process_term ()
register_term_acks (pipes.size ());
// Continue with the termination immediately.
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
void zmq::pub_t::activated (writer_t *pipe_)
diff --git a/src/pub.hpp b/src/pub.hpp
index 8ff192e..6e02be7 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -47,7 +47,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
diff --git a/src/pull.cpp b/src/pull.cpp
index c5f6a8e..5bccf06 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -42,10 +42,10 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
-void zmq::pull_t::process_term ()
+void zmq::pull_t::process_term (int linger_)
{
fq.terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
diff --git a/src/pull.hpp b/src/pull.hpp
index 1b53e3b..d80bf60 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -44,7 +44,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Fair queueing object for inbound pipes.
fq_t fq;
diff --git a/src/push.cpp b/src/push.cpp
index f48f7bb..4f3fa5b 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -43,10 +43,10 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
-void zmq::push_t::process_term ()
+void zmq::push_t::process_term (int linger_)
{
lb.terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
diff --git a/src/push.hpp b/src/push.hpp
index 29a1a1a..ccc98f9 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -44,7 +44,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Load balancer managing the outbound pipes.
lb_t lb;
diff --git a/src/session.cpp b/src/session.cpp
index d5c6fdd..aae7e3c 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -28,8 +28,8 @@
zmq::session_t::session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_) :
- own_t (io_thread_),
- options (options_),
+ own_t (io_thread_, options_),
+ io_object_t (io_thread_),
in_pipe (NULL),
incomplete_in (false),
out_pipe (NULL),
@@ -39,6 +39,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
pipes_attached (false),
delimiter_processed (false),
force_terminate (false),
+ has_linger_timer (false),
state (active)
{
}
@@ -60,6 +61,12 @@ void zmq::session_t::proceed_with_term ()
zmq_assert (state == pending);
state = terminating;
+ // If there's still a pending linger timer, remove it.
+ if (has_linger_timer) {
+ cancel_timer (linger_timer_id);
+ has_linger_timer = false;
+ }
+
if (in_pipe) {
register_term_acks (1);
in_pipe->terminate ();
@@ -69,7 +76,9 @@ void zmq::session_t::proceed_with_term ()
out_pipe->terminate ();
}
- own_t::process_term ();
+ // The session has already waited for the linger period. We don't want
+ // the child objects to linger any more thus linger is set to zero.
+ own_t::process_term (0);
}
bool zmq::session_t::read (::zmq_msg_t *msg_)
@@ -271,11 +280,25 @@ void zmq::session_t::detach ()
in_pipe->check_read ();
}
-void zmq::session_t::process_term ()
+void zmq::session_t::process_term (int linger_)
{
zmq_assert (state == active);
state = pending;
+ // If linger is set to zero, we can terminate the session straight away
+ // not waiting for the pending messages to be sent.
+ if (linger_ == 0) {
+ proceed_with_term ();
+ return;
+ }
+
+ // If there's finite linger value, set up a timer.
+ if (linger_ > 0) {
+ zmq_assert (!has_linger_timer);
+ add_timer (linger_, linger_timer_id);
+ has_linger_timer = true;
+ }
+
// If there's no engine and there's only delimiter in the pipe it wouldn't
// be ever read. Thus we check for it explicitly.
if (in_pipe)
@@ -291,6 +314,15 @@ void zmq::session_t::process_term ()
proceed_with_term ();
}
+void zmq::session_t::timer_event (int id_)
+{
+ // Linger period expired. We can proceed with termination even though
+ // there are still pending messages to be sent.
+ zmq_assert (id_ == linger_timer_id);
+ has_linger_timer = false;
+ proceed_with_term ();
+}
+
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);
diff --git a/src/session.hpp b/src/session.hpp
index 0f90e80..8adda5e 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -22,7 +22,7 @@
#include "own.hpp"
#include "i_inout.hpp"
-#include "options.hpp"
+#include "io_object.hpp"
#include "blob.hpp"
#include "pipe.hpp"
@@ -31,6 +31,7 @@ namespace zmq
class session_t :
public own_t,
+ public io_object_t,
public i_inout,
public i_reader_events,
public i_writer_events
@@ -79,16 +80,16 @@ namespace zmq
~session_t ();
- // Inherited socket options. These are visible to all session classes.
- options_t options;
-
private:
// Handlers for incoming commands.
void process_plug ();
void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
- void process_term ();
+ void process_term (int linger_);
+
+ // i_poll_events handlers.
+ void timer_event (int id_);
// Remove any half processed messages. Flush unflushed messages.
// Call this function when engine disconnect to get rid of leftovers.
@@ -127,6 +128,12 @@ namespace zmq
// pending messages in the inbound pipe.
bool force_terminate;
+ // ID of the linger timer
+ enum {linger_timer_id = 0x20};
+
+ // True is linger timer is running.
+ bool has_linger_timer;
+
enum {
active,
pending,
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 2293701..c1d210d 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -681,7 +681,7 @@ void zmq::socket_base_t::process_unplug ()
{
}
-void zmq::socket_base_t::process_term ()
+void zmq::socket_base_t::process_term (int linger_)
{
// Unregister all inproc endpoints associated with this socket.
// Doing this we make sure that no new pipes from other sockets (inproc)
@@ -689,7 +689,7 @@ void zmq::socket_base_t::process_term ()
unregister_endpoints (this);
// Continue the termination process immediately.
- own_t::process_term ();
+ own_t::process_term (linger_);
}
void zmq::socket_base_t::process_destroy ()
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 4a72a1f..5d083ca 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -28,7 +28,6 @@
#include "own.hpp"
#include "array.hpp"
#include "mutex.hpp"
-#include "options.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "signaler.hpp"
@@ -111,13 +110,10 @@ namespace zmq
virtual bool xhas_in ();
virtual int xrecv (zmq_msg_t *msg_, int options_);
- // Socket options.
- options_t options;
-
// We are declaring termination handler as protected so that
// individual socket types can hook into the termination process
// by overloading it.
- void process_term ();
+ void process_term (int linger_);
// Delay actual destruction of the socket.
void process_destroy ();
diff --git a/src/sub.cpp b/src/sub.cpp
index 825b350..20ffd91 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -48,10 +48,10 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
-void zmq::sub_t::process_term ()
+void zmq::sub_t::process_term (int linger_)
{
fq.terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
diff --git a/src/sub.hpp b/src/sub.hpp
index 06a5333..45c9073 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -48,7 +48,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Check whether the message matches at least one subscription.
bool match (zmq_msg_t *msg_);
diff --git a/src/xrep.cpp b/src/xrep.cpp
index b0ac601..b3e8ebd 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -80,7 +80,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
}
}
-void zmq::xrep_t::process_term ()
+void zmq::xrep_t::process_term (int linger_)
{
terminating = true;
@@ -93,7 +93,7 @@ void zmq::xrep_t::process_term ()
it++)
it->second.writer->terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
void zmq::xrep_t::terminated (reader_t *pipe_)
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 4831aaf..575fc44 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -52,7 +52,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// i_reader_events interface implementation.
void activated (reader_t *pipe_);
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 9f4eb92..017f127 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -44,11 +44,11 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
-void zmq::xreq_t::process_term ()
+void zmq::xreq_t::process_term (int linger_)
{
fq.terminate ();
lb.terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
diff --git a/src/xreq.hpp b/src/xreq.hpp
index eeb349d..6dbc117 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -47,7 +47,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes.
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 46ac8b5..82c3ca1 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -36,12 +36,11 @@
zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_,
const char *protocol_, const char *address_) :
- own_t (io_thread_),
+ own_t (io_thread_, options_),
io_object_t (io_thread_),
handle_valid (false),
wait (wait_before_connect),
- session (session_),
- options (options_)
+ session (session_)
{
int rc = tcp_connecter.set_address (protocol_, address_);
zmq_assert (rc == 0);
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index 7a516f0..3af78cb 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -23,7 +23,6 @@
#include "own.hpp"
#include "io_object.hpp"
#include "tcp_connecter.hpp"
-#include "options.hpp"
#include "stdint.hpp"
namespace zmq
@@ -75,9 +74,6 @@ namespace zmq
// Reference to the session we belong to.
class session_t *session;
- // Associated socket options.
- options_t options;
-
zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&);
};
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index ba9706c..b5c5c86 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -33,12 +33,11 @@
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
- own_t (io_thread_),
+ own_t (io_thread_, options_),
sent (false),
received (false),
socket (socket_),
session (session_),
- options (options_),
io_thread (io_thread_)
{
// Create the engine object for this connection.
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 887f070..6087de9 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -25,7 +25,6 @@
#include "own.hpp"
#include "fd.hpp"
#include "stdint.hpp"
-#include "options.hpp"
#include "stdint.hpp"
#include "blob.hpp"
@@ -76,9 +75,6 @@ namespace zmq
// Identity of the peer socket.
blob_t peer_identity;
- // Associated socket options.
- options_t options;
-
// I/O thread the object is living in. It will be used to plug
// the engine into the same I/O thread.
class io_thread_t *io_thread;
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 78e44e6..14e5249 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -26,9 +26,8 @@
zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_) :
- own_t (io_thread_),
+ own_t (io_thread_, options_),
io_object_t (io_thread_),
- options (options_),
socket (socket_)
{
}
@@ -49,10 +48,10 @@ void zmq::zmq_listener_t::process_plug ()
set_pollin (handle);
}
-void zmq::zmq_listener_t::process_term ()
+void zmq::zmq_listener_t::process_term (int linger_)
{
rm_fd (handle);
- own_t::process_term ();
+ own_t::process_term (linger_);
}
void zmq::zmq_listener_t::in_event ()
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
index 0a5ef0f..f157cf6 100644
--- a/src/zmq_listener.hpp
+++ b/src/zmq_listener.hpp
@@ -23,7 +23,6 @@
#include "own.hpp"
#include "io_object.hpp"
#include "tcp_listener.hpp"
-#include "options.hpp"
#include "stdint.hpp"
namespace zmq
@@ -44,7 +43,7 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
- void process_term ();
+ void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
@@ -55,9 +54,6 @@ namespace zmq
// Handle corresponding to the listening socket.
handle_t handle;
- // Associated socket options.
- options_t options;
-
// Socket the listerner belongs to.
class socket_base_t *socket;