summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-20 11:32:23 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-20 11:32:23 +0200
commita801b6d8b37557ccfb53030dca22f89a3f99b59c (patch)
treed0c41834928aaf1776645e4eb2c2368b317338f6 /src
parent131f2e309668d1e64cfcb4aeb869665d8018bcfe (diff)
couple of bugs in shutdown mechanism fixed
Diffstat (limited to 'src')
-rw-r--r--src/command.hpp11
-rw-r--r--src/object.cpp43
-rw-r--r--src/object.hpp19
-rw-r--r--src/owned.cpp53
-rw-r--r--src/owned.hpp33
-rw-r--r--src/session.cpp5
-rw-r--r--src/session.hpp2
-rw-r--r--src/socket_base.cpp40
-rw-r--r--src/socket_base.hpp26
-rw-r--r--src/zmq_connecter.cpp4
-rw-r--r--src/zmq_connecter.hpp2
-rw-r--r--src/zmq_engine.cpp3
-rw-r--r--src/zmq_init.cpp8
-rw-r--r--src/zmq_init.hpp2
-rw-r--r--src/zmq_listener.cpp4
-rw-r--r--src/zmq_listener.hpp2
16 files changed, 184 insertions, 73 deletions
diff --git a/src/command.hpp b/src/command.hpp
index de94ca3..41c7d6c 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -37,11 +37,11 @@ namespace zmq
stop,
plug,
own,
+ attach,
bind,
term_req,
term,
term_ack
-
} type;
union {
@@ -57,9 +57,14 @@ namespace zmq
// Sent to socket to let it know about the newly created object.
struct {
- class object_t *object;
+ class owned_t *object;
} own;
+ // Attach the engine to the session.
+ struct {
+ class zmq_engine_t *engine;
+ } attach;
+
// Sent between objects to establish pipe(s) between them.
struct {
} bind;
@@ -67,7 +72,7 @@ namespace zmq
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
- class object_t *object;
+ class owned_t *object;
} term_req;
// Sent by socket to I/O object to start its shutdown.
diff --git a/src/object.cpp b/src/object.cpp
index e2267d6..b1b7c8a 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -22,6 +22,10 @@
#include "err.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
+#include "owned.hpp"
+#include "session.hpp"
+#include "socket_base.hpp"
+#include "zmq_engine.hpp" // TODO: remove this line
zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
dispatcher (dispatcher_),
@@ -65,6 +69,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_own (cmd_.args.own.object);
return;
+ case command_t::attach:
+ process_attach (cmd_.args.attach.engine);
+ return;
+
case command_t::bind:
process_bind ();
return;
@@ -101,15 +109,18 @@ void zmq::object_t::send_stop ()
dispatcher->write (thread_slot, thread_slot, cmd);
}
-void zmq::object_t::send_plug (object_t *destination_)
+void zmq::object_t::send_plug (owned_t *destination_)
{
+ // Let the object know that it cannot shut down till it gets this command.
+ destination_->inc_seqnum ();
+
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::plug;
send_command (cmd);
}
-void zmq::object_t::send_own (object_t *destination_, object_t *object_)
+void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
{
command_t cmd;
cmd.destination = destination_;
@@ -118,6 +129,18 @@ void zmq::object_t::send_own (object_t *destination_, object_t *object_)
send_command (cmd);
}
+void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
+{
+ // Let the object know that it cannot shut down till it gets this command.
+ destination_->inc_seqnum ();
+
+ command_t cmd;
+ cmd.destination = destination_;
+ cmd.type = command_t::attach;
+ cmd.args.attach.engine = engine_;
+ send_command (cmd);
+}
+
void zmq::object_t::send_bind (object_t *destination_)
{
command_t cmd;
@@ -126,7 +149,8 @@ void zmq::object_t::send_bind (object_t *destination_)
send_command (cmd);
}
-void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
+void zmq::object_t::send_term_req (socket_base_t *destination_,
+ owned_t *object_)
{
command_t cmd;
cmd.destination = destination_;
@@ -135,7 +159,7 @@ void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
send_command (cmd);
}
-void zmq::object_t::send_term (object_t *destination_)
+void zmq::object_t::send_term (owned_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
@@ -143,7 +167,7 @@ void zmq::object_t::send_term (object_t *destination_)
send_command (cmd);
}
-void zmq::object_t::send_term_ack (object_t *destination_)
+void zmq::object_t::send_term_ack (socket_base_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
@@ -161,7 +185,12 @@ void zmq::object_t::process_plug ()
zmq_assert (false);
}
-void zmq::object_t::process_own (object_t *object_)
+void zmq::object_t::process_own (owned_t *object_)
+{
+ zmq_assert (false);
+}
+
+void zmq::object_t::process_attach (zmq_engine_t *engine_)
{
zmq_assert (false);
}
@@ -171,7 +200,7 @@ void zmq::object_t::process_bind ()
zmq_assert (false);
}
-void zmq::object_t::process_term_req (object_t *object_)
+void zmq::object_t::process_term_req (owned_t *object_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index 7357549..02a071a 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -49,20 +49,25 @@ namespace zmq
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
- void send_plug (object_t *destination_);
- void send_own (object_t *destination_, object_t *object_);
+ void send_plug (class owned_t *destination_);
+ void send_own (class socket_base_t *destination_,
+ class owned_t *object_);
+ void send_attach (class session_t *destination_,
+ class zmq_engine_t *engine_);
void send_bind (object_t *destination_);
- void send_term_req (object_t *destination_, object_t *object_);
- void send_term (object_t *destination_);
- void send_term_ack (object_t *destination_);
+ void send_term_req (class socket_base_t *destination_,
+ class owned_t *object_);
+ void send_term (class owned_t *destination_);
+ void send_term_ack (class socket_base_t *destination_);
// These handlers can be overloaded by the derived objects. They are
// called when command arrives from another thread.
virtual void process_stop ();
virtual void process_plug ();
- virtual void process_own (object_t *object_);
+ virtual void process_own (class owned_t *object_);
+ virtual void process_attach (class zmq_engine_t *engine_);
virtual void process_bind ();
- virtual void process_term_req (object_t *object_);
+ virtual void process_term_req (class owned_t *object_);
virtual void process_term ();
virtual void process_term_ack ();
diff --git a/src/owned.cpp b/src/owned.cpp
index 22e257f..6995a39 100644
--- a/src/owned.cpp
+++ b/src/owned.cpp
@@ -20,11 +20,12 @@
#include "owned.hpp"
#include "err.hpp"
-zmq::owned_t::owned_t (object_t *parent_, object_t *owner_) :
+zmq::owned_t::owned_t (object_t *parent_, socket_base_t *owner_) :
object_t (parent_),
owner (owner_),
- plugged_in (false),
- terminated (false)
+ sent_seqnum (0),
+ processed_seqnum (0),
+ shutting_down (false)
{
}
@@ -32,21 +33,18 @@ zmq::owned_t::~owned_t ()
{
}
-void zmq::owned_t::process_plug ()
+void zmq::owned_t::inc_seqnum ()
{
- zmq_assert (!plugged_in);
+ // NB: This function may be called from a different thread!
+ sent_seqnum.add (1);
+}
- // If termination of the object was already requested, destroy it and
- // send the termination acknowledgement.
- if (terminated) {
- send_term_ack (owner);
- delete this;
- return;
- }
+void zmq::owned_t::process_plug ()
+{
+ // Keep track of how many commands were processed so far.
+ processed_seqnum++;
- // Notify the generic termination mechanism (io_object_t) that the object
- // is already plugged in.
- plugged_in = true;
+ finalise_command ();
}
void zmq::owned_t::term ()
@@ -56,19 +54,20 @@ void zmq::owned_t::term ()
void zmq::owned_t::process_term ()
{
- zmq_assert (!terminated);
+ zmq_assert (!shutting_down);
+ shutting_down = true;
- // If termination request has occured even before the object was plugged in
- // wait till plugging in happens, then acknowledge the termination.
- if (!plugged_in) {
- terminated = true;
- return;
- }
+ finalise_command ();
+}
- // Otherwise, destroy the object and acknowledge the termination
- // straight away.
- send_term_ack (owner);
- process_unplug ();
- delete this;
+void zmq::owned_t::finalise_command ()
+{
+ // If termination request was already received and there are no more
+ // commands to wait for, terminate the object.
+ if (shutting_down && processed_seqnum == sent_seqnum.get ()) {
+ send_term_ack (owner);
+ process_unplug ();
+ delete this;
+ }
}
diff --git a/src/owned.hpp b/src/owned.hpp
index 164622e..22595d1 100644
--- a/src/owned.hpp
+++ b/src/owned.hpp
@@ -20,7 +20,9 @@
#ifndef __ZMQ_OWNED_HPP_INCLUDED__
#define __ZMQ_OWNED_HPP_INCLUDED__
-#include "object.hpp"
+#include "socket_base.hpp"
+#include "atomic_counter.hpp"
+#include "stdint.hpp"
namespace zmq
{
@@ -34,7 +36,12 @@ namespace zmq
// The object will live in parent's thread, however, its lifetime
// will be managed by its owner socket.
- owned_t (object_t *parent_, object_t *owner_);
+ owned_t (object_t *parent_, socket_base_t *owner_);
+
+ // When another owned object wants to send command to this object
+ // it calls this function to let it know it should not shut down
+ // before the command is delivered.
+ void inc_seqnum ();
protected:
@@ -57,21 +64,27 @@ namespace zmq
// classes to ensure sane cleanup.
virtual void process_unplug () = 0;
- // Socket owning this object. It is responsible for destroying
- // it when it's being closed.
- object_t *owner;
+ // Socket owning this object. When the socket is being closed it's
+ // responsible for shutting down this object.
+ socket_base_t *owner;
private:
// Handlers for incoming commands.
void process_term ();
- // Set to true when object is plugged in.
- bool plugged_in;
+ // Generic command handler (to be called from all command handlers
+ // once the processing is done).
+ void finalise_command ();
+
+ // Sequence number of the last command sent to this object.
+ atomic_counter_t sent_seqnum;
+
+ // Sequence number of the last command processed by this object.
+ uint64_t processed_seqnum;
- // Set to true when object was terminated before it was plugged in.
- // In such case destruction is delayed till 'plug' command arrives.
- bool terminated;
+ // If true, the object is already shutting down.
+ bool shutting_down;
owned_t (const owned_t&);
void operator = (const owned_t&);
diff --git a/src/session.cpp b/src/session.cpp
index fa29dd3..2bb4ff6 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -21,7 +21,7 @@
#include "zmq_engine.hpp"
#include "err.hpp"
-zmq::session_t::session_t (object_t *parent_, object_t *owner_,
+zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
zmq_engine_t *engine_) :
owned_t (parent_, owner_),
engine (engine_)
@@ -48,11 +48,14 @@ void zmq::session_t::flush ()
void zmq::session_t::process_plug ()
{
+ zmq_assert (engine);
engine->plug (this);
owned_t::process_plug ();
}
void zmq::session_t::process_unplug ()
{
+ zmq_assert (engine);
engine->unplug ();
+ delete engine;
}
diff --git a/src/session.hpp b/src/session.hpp
index 4228fd9..2cb8e18 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -30,7 +30,7 @@ namespace zmq
{
public:
- session_t (object_t *parent_, object_t *owner_,
+ session_t (object_t *parent_, socket_base_t *owner_,
class zmq_engine_t *engine_);
private:
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 9701f65..fa6c1e3 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -27,7 +27,9 @@
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
+#include "session.hpp"
#include "config.hpp"
+#include "owned.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
@@ -60,6 +62,9 @@ zmq::socket_base_t::~socket_base_t ()
while (pending_term_acks)
app_thread->process_commands (true);
}
+
+ // Check whether there are no session leaks.
+ zmq_assert (sessions.empty ());
}
int zmq::socket_base_t::setsockopt (int option_, void *optval_,
@@ -169,12 +174,43 @@ int zmq::socket_base_t::close ()
return 0;
}
-void zmq::socket_base_t::process_own (object_t *object_)
+void zmq::socket_base_t::register_session (const char *name_,
+ session_t *session_)
+{
+ sessions_sync.lock ();
+ bool inserted = sessions.insert (std::make_pair (name_, session_)).second;
+ zmq_assert (inserted);
+ sessions_sync.unlock ();
+}
+
+void zmq::socket_base_t::unregister_session (const char *name_)
+{
+ sessions_sync.lock ();
+ sessions_t::iterator it = sessions.find (name_);
+ zmq_assert (it != sessions.end ());
+ sessions.erase (it);
+ sessions_sync.unlock ();
+}
+
+zmq::session_t *zmq::socket_base_t::get_session (const char *name_)
+{
+ sessions_sync.lock ();
+ sessions_t::iterator it = sessions.find (name_);
+ session_t *session = NULL;
+ if (it != sessions.end ()) {
+ session = it->second;
+ session->inc_seqnum ();
+ }
+ sessions_sync.unlock ();
+ return session;
+}
+
+void zmq::socket_base_t::process_own (owned_t *object_)
{
io_objects.insert (object_);
}
-void zmq::socket_base_t::process_term_req (object_t *object_)
+void zmq::socket_base_t::process_term_req (owned_t *object_)
{
// If I/O object is well and alive ask it to terminate.
io_objects_t::iterator it = std::find (io_objects.begin (),
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index e96cc2d..8e99654 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -21,9 +21,11 @@
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <set>
+#include <map>
#include <string>
#include "object.hpp"
+#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
@@ -46,16 +48,26 @@ namespace zmq
virtual int recv (struct zmq_msg *msg_, int flags_);
virtual int close ();
+ // Functions that owned objects use to manipulate socket's list
+ // of existing sessions.
+ // Note that this functionality cannot be implemented via inter-thread
+ // commands as it is unacceptable to wait for the completion of the
+ // action till user application yields control of the application
+ // thread to 0MQ.
+ void register_session (const char *name_, class session_t *session_);
+ void unregister_session (const char *name_);
+ class session_t *get_session (const char *name_);
+
private:
// Handlers for incoming commands.
- void process_own (object_t *object_);
- void process_term_req (object_t *object_);
+ void process_own (class owned_t *object_);
+ void process_term_req (class owned_t *object_);
void process_term_ack ();
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
- typedef std::set <object_t*> io_objects_t;
+ typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects;
// Number of I/O objects that were already asked to terminate
@@ -68,6 +80,14 @@ namespace zmq
// Socket options.
options_t options;
+ // List of existing sessions. This list is never referenced from within
+ // the socket, instead it is used by I/O objects owned by the session.
+ // As those objects can live in different threads, the access is
+ // synchronised using 'sessions_sync' mutex.
+ typedef std::map <std::string, session_t*> sessions_t;
+ sessions_t sessions;
+ mutex_t sessions_sync;
+
socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&);
};
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 4416a70..00c8cb2 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -22,8 +22,8 @@
#include "io_thread.hpp"
#include "err.hpp"
-zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_,
- const options_t &options_) :
+zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
+ socket_base_t *owner_, const options_t &options_) :
owned_t (parent_, owner_),
io_object_t (parent_),
handle_valid (false),
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index 93497cb..dcdec19 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -33,7 +33,7 @@ namespace zmq
{
public:
- zmq_connecter_t (class io_thread_t *parent_, object_t *owner_,
+ zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_);
// Set IP address to connect to.
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 3620d30..d8b8cfc 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -73,7 +73,6 @@ void zmq::zmq_engine_t::in_event ()
// Read as much data as possible to the read buffer.
insize = tcp_socket.read (inbuf, in_batch_size);
-printf ("%d bytes read\n", (int) insize);
inpos = 0;
// Check whether the peer has closed the connection.
@@ -132,5 +131,5 @@ void zmq::zmq_engine_t::out_event ()
void zmq::zmq_engine_t::error ()
{
- zmq_assert (false);
+// zmq_assert (false);
}
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index fea1452..124622d 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -22,8 +22,8 @@
#include "session.hpp"
#include "err.hpp"
-zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, object_t *owner_, fd_t fd_,
- bool connected_, const options_t &options_) :
+zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
+ fd_t fd_, bool connected_, const options_t &options_) :
owned_t (parent_, owner_),
connected (connected_),
options (options_)
@@ -81,13 +81,15 @@ void zmq::zmq_init_t::flush ()
void zmq::zmq_init_t::process_plug ()
{
+ zmq_assert (engine);
engine->plug (this);
owned_t::process_plug ();
}
void zmq::zmq_init_t::process_unplug ()
{
- engine->unplug ();
+ if (engine)
+ engine->unplug ();
}
void zmq::zmq_init_t::create_session ()
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 2e0910a..5eb289e 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -44,7 +44,7 @@ namespace zmq
// Set 'connected' to true if the connection was created by 'connect'
// function. If it was accepted from a listening socket, set it to
// false.
- zmq_init_t (class io_thread_t *parent_, object_t *owner_, fd_t fd_,
+ zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, fd_t fd_,
bool connected_, const options_t &options);
~zmq_init_t ();
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index c990468..49bbf61 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -22,8 +22,8 @@
#include "io_thread.hpp"
#include "err.hpp"
-zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_,
- const options_t &options_) :
+zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_,
+ socket_base_t *owner_, const options_t &options_) :
owned_t (parent_, owner_),
io_object_t (parent_),
options (options_)
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
index f85ad5a..899aaf9 100644
--- a/src/zmq_listener.hpp
+++ b/src/zmq_listener.hpp
@@ -33,7 +33,7 @@ namespace zmq
{
public:
- zmq_listener_t (class io_thread_t *parent_, object_t *owner_,
+ zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_);
// Set IP address to listen on.