summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-10-16 10:53:29 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-10-16 10:53:29 +0200
commit0a03e86e9547fa7c221b316a5a943467adea3dfd (patch)
treef47b0cd7d3c91c59de419506f8d66c27e327e41c
parenta1474e305762d32df2b79300d124aac7fa0181c8 (diff)
ZMQ_LINGER socket option added.
1. ZMQ_LINGER option can be set/get 2. options are part of own_t base class rather than being declared separately by individual objects 3. Linger option is propagated with "term" command so that the newest value of it is used rather than the stored old one. 4. Session sets the linger timer if needed and terminates as soon as it expires. 5. Corresponding documentation updated. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--doc/zmq_getsockopt.txt16
-rw-r--r--doc/zmq_setsockopt.txt16
-rw-r--r--include/zmq.h1
-rw-r--r--src/command.hpp1
-rw-r--r--src/object.cpp7
-rw-r--r--src/object.hpp4
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp3
-rw-r--r--src/own.cpp20
-rw-r--r--src/own.hpp8
-rw-r--r--src/pair.cpp4
-rw-r--r--src/pair.hpp2
-rw-r--r--src/pub.cpp4
-rw-r--r--src/pub.hpp2
-rw-r--r--src/pull.cpp4
-rw-r--r--src/pull.hpp2
-rw-r--r--src/push.cpp4
-rw-r--r--src/push.hpp2
-rw-r--r--src/session.cpp40
-rw-r--r--src/session.hpp17
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/socket_base.hpp6
-rw-r--r--src/sub.cpp4
-rw-r--r--src/sub.hpp2
-rw-r--r--src/xrep.cpp4
-rw-r--r--src/xrep.hpp2
-rw-r--r--src/xreq.cpp4
-rw-r--r--src/xreq.hpp2
-rw-r--r--src/zmq_connecter.cpp5
-rw-r--r--src/zmq_connecter.hpp4
-rw-r--r--src/zmq_init.cpp3
-rw-r--r--src/zmq_init.hpp4
-rw-r--r--src/zmq_listener.cpp7
-rw-r--r--src/zmq_listener.hpp6
34 files changed, 158 insertions, 74 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 5fee978..54d6b58 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -212,6 +212,22 @@ Default value:: 0
Applicable socket types:: all
+ZMQ_LINGER: Set linger period for socket shutdown
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_LINGER' option shall retrieve the period for pending outbound
+messages to linger in memory after closing the socket. Value of -1 means
+infinite. Pending messages will be kept until they are fully transferred to
+the peer. Value of 0 means that all the pending messages are dropped immediately
+when socket is closed. Positive value means number of milliseconds to keep
+trying to send the pending messages before discarding them.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1
+Applicable socket types:: all
+
+
ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve file descriptor associated with the 0MQ
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index a60fc26..5cbdaf6 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -216,6 +216,22 @@ Default value:: 0
Applicable socket types:: all
+ZMQ_LINGER: Set linger period for socket shutdown
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_LINGER' option shall be set to specify period for pending outbound
+messages to linger in memory after closing the socket. Value of -1 means
+infinite. Pending messages will be kept until they are fully transferred to
+the peer. Value of 0 means that all the pending messages are dropped immediately
+when socket is closed. Positive value means number of milliseconds to keep
+trying to send the pending messages before discarding them.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1
+Applicable socket types:: all
+
+
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
diff --git a/include/zmq.h b/include/zmq.h
index 9684cd5..b565c50 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -191,6 +191,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_FD 14
#define ZMQ_EVENTS 15
#define ZMQ_TYPE 16
+#define ZMQ_LINGER 17
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
diff --git a/src/command.hpp b/src/command.hpp
index c64ca92..0c094b9 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -109,6 +109,7 @@ namespace zmq
// Sent by socket to I/O object to start its shutdown.
struct {
+ int linger;
} term;
// Sent by I/O object to the socket to acknowledge it has
diff --git a/src/object.cpp b/src/object.cpp
index 90c015a..fc08785 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -106,7 +106,7 @@ void zmq::object_t::process_command (command_t &cmd_)
break;
case command_t::term:
- process_term ();
+ process_term (cmd_.args.term.linger);
break;
case command_t::term_ack:
@@ -312,7 +312,7 @@ void zmq::object_t::send_term_req (own_t *destination_,
send_command (cmd);
}
-void zmq::object_t::send_term (own_t *destination_)
+void zmq::object_t::send_term (own_t *destination_, int linger_)
{
command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY
@@ -320,6 +320,7 @@ void zmq::object_t::send_term (own_t *destination_)
#endif
cmd.destination = destination_;
cmd.type = command_t::term;
+ cmd.args.term.linger = linger_;
send_command (cmd);
}
@@ -386,7 +387,7 @@ void zmq::object_t::process_term_req (own_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_term ()
+void zmq::object_t::process_term (int linger_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index bc1b325..9580556 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -80,7 +80,7 @@ namespace zmq
void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class own_t *destination_,
class own_t *object_);
- void send_term (class own_t *destination_);
+ void send_term (class own_t *destination_, int linger_);
void send_term_ack (class own_t *destination_);
// These handlers can be overloaded by the derived objects. They are
@@ -97,7 +97,7 @@ namespace zmq
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
virtual void process_term_req (class own_t *object_);
- virtual void process_term ();
+ virtual void process_term (int linger_);
virtual void process_term_ack ();
// Special handler called after a command that requires a seqnum
diff --git a/src/options.cpp b/src/options.cpp
index e644a9b..c9e330f 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -34,6 +34,7 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
type (-1),
+ linger (-1),
requires_in (false),
requires_out (false),
immediate_connect (true)
@@ -128,6 +129,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
rcvbuf = *((uint64_t*) optval_);
return 0;
+
+ case ZMQ_LINGER:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ linger = *((int*) optval_);
+ return 0;
}
errno = EINVAL;
@@ -138,6 +147,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
{
switch (option_) {
+ case ZMQ_LINGER:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = linger;
+ *optvallen_ = sizeof (int);
+ return 0;
+
case ZMQ_TYPE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index ea0c841..9b57ab6 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -54,6 +54,9 @@ namespace zmq
// Socket type.
int type;
+ // Linger time, in milliseconds.
+ int linger;
+
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
diff --git a/src/own.cpp b/src/own.cpp
index 12f50bf..c91650d 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -31,8 +31,9 @@ zmq::own_t::own_t (class ctx_t *parent_, uint32_t slot_) :
{
}
-zmq::own_t::own_t (io_thread_t *io_thread_) :
+zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
object_t (io_thread_),
+ options (options_),
terminating (false),
sent_seqnum (0),
processed_seqnum (0),
@@ -113,16 +114,19 @@ void zmq::own_t::process_term_req (own_t *object_)
owned.erase (it);
register_term_acks (1);
- send_term (object_);
+
+ // Note that this object is the root of the (partial shutdown) thus, its
+ // value of linger is used, rather than the value stored by the children.
+ send_term (object_, options.linger);
}
void zmq::own_t::process_own (own_t *object_)
{
// If the object is already being shut down, new owned objects are
- // immediately asked to terminate.
+ // immediately asked to terminate. Note that linger is set to zero.
if (terminating) {
register_term_acks (1);
- send_term (object_);
+ send_term (object_, 0);
return;
}
@@ -140,7 +144,7 @@ void zmq::own_t::terminate ()
// As for the root of the ownership tree, there's noone to terminate it,
// so it has to terminate itself.
if (!owner) {
- process_term ();
+ process_term (options.linger);
return;
}
@@ -148,14 +152,14 @@ void zmq::own_t::terminate ()
send_term_req (owner, this);
}
-void zmq::own_t::process_term ()
+void zmq::own_t::process_term (int linger_)
{
// Double termination should never happen.
zmq_assert (!terminating);
- // Send termination request to all owned objects.
+ // Send termination request to all owned objects.
for (owned_t::iterator it = owned.begin (); it != owned.end (); it++)
- send_term (*it);
+ send_term (*it, linger_);
register_term_acks (owned.size ());
owned.clear ();
diff --git a/src/own.hpp b/src/own.hpp
index 6b6f7bf..18f7251 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -24,6 +24,7 @@
#include <algorithm>
#include "object.hpp"
+#include "options.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
@@ -45,7 +46,7 @@ namespace zmq
own_t (class ctx_t *parent_, uint32_t slot_);
// The object is living within I/O thread.
- own_t (class io_thread_t *io_thread_);
+ own_t (class io_thread_t *io_thread_, const options_t &options_);
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
@@ -83,12 +84,15 @@ namespace zmq
// Term handler is protocted rather than private so that it can
// be intercepted by the derived class. This is useful to add custom
// steps to the beginning of the termination process.
- void process_term ();
+ void process_term (int linger_);
// A place to hook in when phyicallal destruction of the object
// is to be delayed.
virtual void process_destroy ();
+ // Socket options associated with this object.
+ options_t options;
+
private:
// Set owner of the object
diff --git a/src/pair.cpp b/src/pair.cpp
index 492ec55..8ee29cf 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -86,7 +86,7 @@ void zmq::pair_t::delimited (reader_t *pipe_)
{
}
-void zmq::pair_t::process_term ()
+void zmq::pair_t::process_term (int linger_)
{
terminating = true;
@@ -100,7 +100,7 @@ void zmq::pair_t::process_term ()
outpipe->terminate ();
}
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
void zmq::pair_t::activated (class reader_t *pipe_)
diff --git a/src/pair.hpp b/src/pair.hpp
index 030fb97..a14544f 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -56,7 +56,7 @@ namespace zmq
private:
// Hook into termination process.
- void process_term ();
+ void process_term (int linger_);
class reader_t *inpipe;
class writer_t *outpipe;
diff --git a/src/pub.cpp b/src/pub.cpp
index b1a1239..6290e9a 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -56,7 +56,7 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
}
}
-void zmq::pub_t::process_term ()
+void zmq::pub_t::process_term (int linger_)
{
terminating = true;
@@ -68,7 +68,7 @@ void zmq::pub_t::process_term ()
register_term_acks (pipes.size ());
// Continue with the termination immediately.
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
void zmq::pub_t::activated (writer_t *pipe_)
diff --git a/src/pub.hpp b/src/pub.hpp
index 8ff192e..6e02be7 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -47,7 +47,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
diff --git a/src/pull.cpp b/src/pull.cpp
index c5f6a8e..5bccf06 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -42,10 +42,10 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
-void zmq::pull_t::process_term ()
+void zmq::pull_t::process_term (int linger_)
{
fq.terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
diff --git a/src/pull.hpp b/src/pull.hpp
index 1b53e3b..d80bf60 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -44,7 +44,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Fair queueing object for inbound pipes.
fq_t fq;
diff --git a/src/push.cpp b/src/push.cpp
index f48f7bb..4f3fa5b 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -43,10 +43,10 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
-void zmq::push_t::process_term ()
+void zmq::push_t::process_term (int linger_)
{
lb.terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
diff --git a/src/push.hpp b/src/push.hpp
index 29a1a1a..ccc98f9 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -44,7 +44,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Load balancer managing the outbound pipes.
lb_t lb;
diff --git a/src/session.cpp b/src/session.cpp
index d5c6fdd..aae7e3c 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -28,8 +28,8 @@
zmq::session_t::session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_) :
- own_t (io_thread_),
- options (options_),
+ own_t (io_thread_, options_),
+ io_object_t (io_thread_),
in_pipe (NULL),
incomplete_in (false),
out_pipe (NULL),
@@ -39,6 +39,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
pipes_attached (false),
delimiter_processed (false),
force_terminate (false),
+ has_linger_timer (false),
state (active)
{
}
@@ -60,6 +61,12 @@ void zmq::session_t::proceed_with_term ()
zmq_assert (state == pending);
state = terminating;
+ // If there's still a pending linger timer, remove it.
+ if (has_linger_timer) {
+ cancel_timer (linger_timer_id);
+ has_linger_timer = false;
+ }
+
if (in_pipe) {
register_term_acks (1);
in_pipe->terminate ();
@@ -69,7 +76,9 @@ void zmq::session_t::proceed_with_term ()
out_pipe->terminate ();
}
- own_t::process_term ();
+ // The session has already waited for the linger period. We don't want
+ // the child objects to linger any more thus linger is set to zero.
+ own_t::process_term (0);
}
bool zmq::session_t::read (::zmq_msg_t *msg_)
@@ -271,11 +280,25 @@ void zmq::session_t::detach ()
in_pipe->check_read ();
}
-void zmq::session_t::process_term ()
+void zmq::session_t::process_term (int linger_)
{
zmq_assert (state == active);
state = pending;
+ // If linger is set to zero, we can terminate the session straight away
+ // not waiting for the pending messages to be sent.
+ if (linger_ == 0) {
+ proceed_with_term ();
+ return;
+ }
+
+ // If there's finite linger value, set up a timer.
+ if (linger_ > 0) {
+ zmq_assert (!has_linger_timer);
+ add_timer (linger_, linger_timer_id);
+ has_linger_timer = true;
+ }
+
// If there's no engine and there's only delimiter in the pipe it wouldn't
// be ever read. Thus we check for it explicitly.
if (in_pipe)
@@ -291,6 +314,15 @@ void zmq::session_t::process_term ()
proceed_with_term ();
}
+void zmq::session_t::timer_event (int id_)
+{
+ // Linger period expired. We can proceed with termination even though
+ // there are still pending messages to be sent.
+ zmq_assert (id_ == linger_timer_id);
+ has_linger_timer = false;
+ proceed_with_term ();
+}
+
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);
diff --git a/src/session.hpp b/src/session.hpp
index 0f90e80..8adda5e 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -22,7 +22,7 @@
#include "own.hpp"
#include "i_inout.hpp"
-#include "options.hpp"
+#include "io_object.hpp"
#include "blob.hpp"
#include "pipe.hpp"
@@ -31,6 +31,7 @@ namespace zmq
class session_t :
public own_t,
+ public io_object_t,
public i_inout,
public i_reader_events,
public i_writer_events
@@ -79,16 +80,16 @@ namespace zmq
~session_t ();
- // Inherited socket options. These are visible to all session classes.
- options_t options;
-
private:
// Handlers for incoming commands.
void process_plug ();
void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
- void process_term ();
+ void process_term (int linger_);
+
+ // i_poll_events handlers.
+ void timer_event (int id_);
// Remove any half processed messages. Flush unflushed messages.
// Call this function when engine disconnect to get rid of leftovers.
@@ -127,6 +128,12 @@ namespace zmq
// pending messages in the inbound pipe.
bool force_terminate;
+ // ID of the linger timer
+ enum {linger_timer_id = 0x20};
+
+ // True is linger timer is running.
+ bool has_linger_timer;
+
enum {
active,
pending,
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 2293701..c1d210d 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -681,7 +681,7 @@ void zmq::socket_base_t::process_unplug ()
{
}
-void zmq::socket_base_t::process_term ()
+void zmq::socket_base_t::process_term (int linger_)
{
// Unregister all inproc endpoints associated with this socket.
// Doing this we make sure that no new pipes from other sockets (inproc)
@@ -689,7 +689,7 @@ void zmq::socket_base_t::process_term ()
unregister_endpoints (this);
// Continue the termination process immediately.
- own_t::process_term ();
+ own_t::process_term (linger_);
}
void zmq::socket_base_t::process_destroy ()
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 4a72a1f..5d083ca 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -28,7 +28,6 @@
#include "own.hpp"
#include "array.hpp"
#include "mutex.hpp"
-#include "options.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "signaler.hpp"
@@ -111,13 +110,10 @@ namespace zmq
virtual bool xhas_in ();
virtual int xrecv (zmq_msg_t *msg_, int options_);
- // Socket options.
- options_t options;
-
// We are declaring termination handler as protected so that
// individual socket types can hook into the termination process
// by overloading it.
- void process_term ();
+ void process_term (int linger_);
// Delay actual destruction of the socket.
void process_destroy ();
diff --git a/src/sub.cpp b/src/sub.cpp
index 825b350..20ffd91 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -48,10 +48,10 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
-void zmq::sub_t::process_term ()
+void zmq::sub_t::process_term (int linger_)
{
fq.terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
diff --git a/src/sub.hpp b/src/sub.hpp
index 06a5333..45c9073 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -48,7 +48,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Check whether the message matches at least one subscription.
bool match (zmq_msg_t *msg_);
diff --git a/src/xrep.cpp b/src/xrep.cpp
index b0ac601..b3e8ebd 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -80,7 +80,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
}
}
-void zmq::xrep_t::process_term ()
+void zmq::xrep_t::process_term (int linger_)
{
terminating = true;
@@ -93,7 +93,7 @@ void zmq::xrep_t::process_term ()
it++)
it->second.writer->terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
void zmq::xrep_t::terminated (reader_t *pipe_)
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 4831aaf..575fc44 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -52,7 +52,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// i_reader_events interface implementation.
void activated (reader_t *pipe_);
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 9f4eb92..017f127 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -44,11 +44,11 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
-void zmq::xreq_t::process_term ()
+void zmq::xreq_t::process_term (int linger_)
{
fq.terminate ();
lb.terminate ();
- socket_base_t::process_term ();
+ socket_base_t::process_term (linger_);
}
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
diff --git a/src/xreq.hpp b/src/xreq.hpp
index eeb349d..6dbc117 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -47,7 +47,7 @@ namespace zmq
private:
// Hook into the termination process.
- void process_term ();
+ void process_term (int linger_);
// Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes.
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 46ac8b5..82c3ca1 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -36,12 +36,11 @@
zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_,
const char *protocol_, const char *address_) :
- own_t (io_thread_),
+ own_t (io_thread_, options_),
io_object_t (io_thread_),
handle_valid (false),
wait (wait_before_connect),
- session (session_),
- options (options_)
+ session (session_)
{
int rc = tcp_connecter.set_address (protocol_, address_);
zmq_assert (rc == 0);
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index 7a516f0..3af78cb 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -23,7 +23,6 @@
#include "own.hpp"
#include "io_object.hpp"
#include "tcp_connecter.hpp"
-#include "options.hpp"
#include "stdint.hpp"
namespace zmq
@@ -75,9 +74,6 @@ namespace zmq
// Reference to the session we belong to.
class session_t *session;
- // Associated socket options.
- options_t options;
-
zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&);
};
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index ba9706c..b5c5c86 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -33,12 +33,11 @@
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
- own_t (io_thread_),
+ own_t (io_thread_, options_),
sent (false),
received (false),
socket (socket_),
session (session_),
- options (options_),
io_thread (io_thread_)
{
// Create the engine object for this connection.
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 887f070..6087de9 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -25,7 +25,6 @@
#include "own.hpp"
#include "fd.hpp"
#include "stdint.hpp"
-#include "options.hpp"
#include "stdint.hpp"
#include "blob.hpp"
@@ -76,9 +75,6 @@ namespace zmq
// Identity of the peer socket.
blob_t peer_identity;
- // Associated socket options.
- options_t options;
-
// I/O thread the object is living in. It will be used to plug
// the engine into the same I/O thread.
class io_thread_t *io_thread;
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 78e44e6..14e5249 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -26,9 +26,8 @@
zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_) :
- own_t (io_thread_),
+ own_t (io_thread_, options_),
io_object_t (io_thread_),
- options (options_),
socket (socket_)
{
}
@@ -49,10 +48,10 @@ void zmq::zmq_listener_t::process_plug ()
set_pollin (handle);
}
-void zmq::zmq_listener_t::process_term ()
+void zmq::zmq_listener_t::process_term (int linger_)
{
rm_fd (handle);
- own_t::process_term ();
+ own_t::process_term (linger_);
}
void zmq::zmq_listener_t::in_event ()
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
index 0a5ef0f..f157cf6 100644
--- a/src/zmq_listener.hpp
+++ b/src/zmq_listener.hpp
@@ -23,7 +23,6 @@
#include "own.hpp"
#include "io_object.hpp"
#include "tcp_listener.hpp"
-#include "options.hpp"
#include "stdint.hpp"
namespace zmq
@@ -44,7 +43,7 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
- void process_term ();
+ void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
@@ -55,9 +54,6 @@ namespace zmq
// Handle corresponding to the listening socket.
handle_t handle;
- // Associated socket options.
- options_t options;
-
// Socket the listerner belongs to.
class socket_base_t *socket;