summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am14
-rw-r--r--src/app_thread.cpp37
-rw-r--r--src/app_thread.hpp13
-rw-r--r--src/command.hpp59
-rw-r--r--src/dispatcher.cpp (renamed from src/context.cpp)22
-rw-r--r--src/dispatcher.hpp (renamed from src/context.hpp)20
-rw-r--r--src/err.hpp6
-rw-r--r--src/i_api.hpp34
-rw-r--r--src/io_object.cpp41
-rw-r--r--src/io_object.hpp62
-rw-r--r--src/io_thread.cpp8
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/mutex.hpp30
-rw-r--r--src/object.cpp158
-rw-r--r--src/object.hpp42
-rw-r--r--src/socket_base.cpp129
-rw-r--r--src/socket_base.hpp72
-rw-r--r--src/zmq.cpp19
-rw-r--r--src/zmq_listener.cpp (renamed from src/i_socket.hpp)23
-rw-r--r--src/zmq_listener.hpp46
20 files changed, 569 insertions, 268 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index bde9c39..47037a2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -7,14 +7,15 @@ libzmq_la_SOURCES = \
atomic_ptr.hpp \
command.hpp \
config.hpp \
- context.hpp \
decoder.hpp \
devpoll.hpp \
+ dispatcher.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
fd.hpp \
fd_signaler.hpp \
+ io_object.hpp \
io_thread.hpp \
ip.hpp \
i_api.hpp \
@@ -31,6 +32,7 @@ libzmq_la_SOURCES = \
poll.hpp \
select.hpp \
simple_semaphore.hpp \
+ socket_base.hpp \
stdint.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
@@ -42,25 +44,29 @@ libzmq_la_SOURCES = \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
+ zmq_listener.hpp \
app_thread.cpp \
- context.cpp \
- devpoll.hpp \
+ devpoll.cpp \
+ dispatcher.cpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
+ io_object.cpp \
io_thread.cpp \
ip.cpp \
kqueue.cpp \
object.cpp \
poll.cpp \
select.cpp \
+ socket_base.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
uuid.cpp \
ypollset.cpp \
- zmq.cpp
+ zmq.cpp \
+ zmq_listener.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 23a055a..3f76970 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <algorithm>
+
#include "../include/zmq.h"
#if defined ZMQ_HAVE_WINDOWS
@@ -26,10 +28,12 @@
#endif
#include "app_thread.hpp"
-#include "context.hpp"
+#include "i_api.hpp"
+#include "dispatcher.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
+#include "socket_base.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -39,8 +43,8 @@
#define ZMQ_DELAY_COMMANDS
#endif
-zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
- object_t (context_, thread_slot_),
+zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+ object_t (dispatcher_, thread_slot_),
tid (0),
last_processing_time (0)
{
@@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
zmq::app_thread_t::~app_thread_t ()
{
- // Ask all the sockets to start termination, then wait till it is complete.
- for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
- (*it)->stop ();
+ // Destroy all the sockets owned by this application thread.
for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
delete *it;
-
- delete this;
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
@@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_)
for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (ypollset_t::signals_t (1) << i)) {
command_t cmd;
- while (context->read (i, get_thread_slot (), &cmd))
+ while (dispatcher->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd);
}
}
}
}
+
+zmq::i_api *zmq::app_thread_t::create_socket (int type_)
+{
+ // TODO: type is ignored for the time being.
+ socket_base_t *s = new socket_base_t (this);
+ zmq_assert (s);
+ sockets.push_back (s);
+ return s;
+}
+
+void zmq::app_thread_t::remove_socket (i_api *socket_)
+{
+ // TODO: To speed this up we can possibly use the system where each socket
+ // holds its index (see I/O scheduler implementation).
+ sockets_t::iterator it = std::find (sockets.begin (), sockets.end (),
+ socket_);
+ zmq_assert (it != sockets.end ());
+ sockets.erase (it);
+}
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 31679b8..59e4a25 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -22,7 +22,6 @@
#include <vector>
-#include "i_socket.hpp"
#include "stdint.hpp"
#include "object.hpp"
#include "ypollset.hpp"
@@ -34,7 +33,7 @@ namespace zmq
{
public:
- app_thread_t (class context_t *context_, int thread_slot_);
+ app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
~app_thread_t ();
@@ -42,7 +41,7 @@ namespace zmq
i_signaler *get_signaler ();
// Nota bene: Following two functions are accessed from different
- // threads. The caller (context) is responsible for synchronisation
+ // threads. The caller (dispatcher) is responsible for synchronisation
// of accesses.
// Returns true is current thread is associated with the app thread.
@@ -56,10 +55,16 @@ namespace zmq
// set to true, returns only after at least one command was processed.
void process_commands (bool block_);
+ // Create a socket of a specified type.
+ struct i_api *create_socket (int type_);
+
+ // Unregister the socket from the app_thread (called by socket itself).
+ void remove_socket (struct i_api *socket_);
+
private:
// All the sockets created from this application thread.
- typedef std::vector <i_socket*> sockets_t;
+ typedef std::vector <struct i_api*> sockets_t;
sockets_t sockets;
// Thread ID associated with this slot.
diff --git a/src/command.hpp b/src/command.hpp
index 69c4e57..de94ca3 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -35,60 +35,49 @@ namespace zmq
enum type_t
{
stop,
+ plug,
+ own,
bind,
- head,
- tail,
- reg,
- reg_and_bind,
- unreg,
- engine,
- terminate,
- terminate_ack
+ term_req,
+ term,
+ term_ack
+
} type;
union {
+ // Sent to I/O thread to let it know that it should
+ // terminate itself.
struct {
} stop;
+ // Sent to I/O object to make it register with its I/O thread.
struct {
- class pipe_reader_t *reader;
- class session_t *peer;
- } bind;
+ } plug;
+ // Sent to socket to let it know about the newly created object.
struct {
- uint64_t bytes;
- } tail;
+ class object_t *object;
+ } own;
+ // Sent between objects to establish pipe(s) between them.
struct {
- uint64_t bytes;
- } head;
-
- struct {
- class simple_semaphore_t *smph;
- } reg;
-
- struct {
- class session_t *peer;
- bool flow_in;
- bool flow_out;
- } reg_and_bind;
-
- struct {
- class simple_semaphore_t *smph;
- } unreg;
+ } bind;
- // TODO: Engine object won't be deallocated on terminal shutdown
- // while the command is still on the fly!
+ // Sent by I/O object ot the socket to request the shutdown of
+ // the I/O object.
struct {
- class i_engine *engine;
- } engine;
+ class object_t *object;
+ } term_req;
+ // Sent by socket to I/O object to start its shutdown.
struct {
- } terminate;
+ } term;
+ // Sent by I/O object to the socket to acknowledge it has
+ // shut down.
struct {
- } terminate_ack;
+ } term_ack;
} args;
};
diff --git a/src/context.cpp b/src/dispatcher.cpp
index 6b071cf..0b68880 100644
--- a/src/context.cpp
+++ b/src/dispatcher.cpp
@@ -19,7 +19,7 @@
#include "../include/zmq.h"
-#include "context.hpp"
+#include "dispatcher.hpp"
#include "i_api.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
@@ -31,7 +31,7 @@
#include "windows.h"
#endif
-zmq::context_t::context_t (int app_threads_, int io_threads_)
+zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
@@ -69,7 +69,7 @@ zmq::context_t::context_t (int app_threads_, int io_threads_)
io_threads [i]->start ();
}
-zmq::context_t::~context_t ()
+zmq::dispatcher_t::~dispatcher_t ()
{
// Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
@@ -93,12 +93,12 @@ zmq::context_t::~context_t ()
#endif
}
-int zmq::context_t::thread_slot_count ()
+int zmq::dispatcher_t::thread_slot_count ()
{
return signalers.size ();
}
-zmq::i_api *zmq::context_t::create_socket (int type_)
+zmq::i_api *zmq::dispatcher_t::create_socket (int type_)
{
threads_sync.lock ();
app_thread_t *thread = choose_app_thread ();
@@ -106,16 +106,12 @@ zmq::i_api *zmq::context_t::create_socket (int type_)
threads_sync.unlock ();
return NULL;
}
-
- zmq_assert (false);
- i_api *s = NULL;
- //i_api *s = thread->create_socket (type_);
-
threads_sync.unlock ();
- return s;
+
+ return thread->create_socket (type_);
}
-zmq::app_thread_t *zmq::context_t::choose_app_thread ()
+zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
@@ -132,7 +128,7 @@ zmq::app_thread_t *zmq::context_t::choose_app_thread ()
return NULL;
}
-zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_)
+zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)
{
zmq_assert (io_threads.size () > 0);
diff --git a/src/context.hpp b/src/dispatcher.hpp
index f2eab1c..08ffab1 100644
--- a/src/context.hpp
+++ b/src/dispatcher.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__
-#define __ZMQ_CONTEXT_HPP_INCLUDED__
+#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__
+#define __ZMQ_DISPATCHER_HPP_INCLUDED__
#include <vector>
#include <map>
@@ -37,27 +37,27 @@ namespace zmq
// Dispatcher implements bidirectional thread-safe passing of commands
// between N threads. It consists of a ypipes to pass commands and
// signalers to wake up the receiver thread when new commands are
- // available. Note that context is inefficient for passing messages
+ // available. Note that dispatcher is inefficient for passing messages
// within a thread (sender thread = receiver thread). The optimisation is
// not part of the class and should be implemented by individual threads
// (presumably by calling the command handling function directly).
- class context_t
+ class dispatcher_t
{
public:
- // Create the context object. Matrix of pipes to communicate between
+ // Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
- context_t (int app_threads_, int io_threads_);
+ dispatcher_t (int app_threads_, int io_threads_);
// To be called to terminate the whole infrastructure (zmq_term).
- ~context_t ();
+ ~dispatcher_t ();
// Create a socket.
struct i_api *create_socket (int type_);
- // Returns number of thread slots in the context. To be used by
+ // Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be
// received.
int thread_slot_count ();
@@ -112,8 +112,8 @@ namespace zmq
// Synchronisation of accesses to shared thread data.
mutex_t threads_sync;
- context_t (const context_t&);
- void operator = (const context_t&);
+ dispatcher_t (const dispatcher_t&);
+ void operator = (const dispatcher_t&);
};
}
diff --git a/src/err.hpp b/src/err.hpp
index fdfce01..3854d8a 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -80,6 +80,12 @@ namespace zmq
abort ();\
}} while (false)
+// Provides convenient way to check for POSIX errors.
+#define posix_assert(x) do {\
+fprintf (stderr, "%s (%s:%d)\n", strerror (x), __FILE__, __LINE__);\
+abort ();\
+} while (false)
+
// Provides convenient way to check for errors from getaddrinfo.
#define gai_assert(x) do { if (x) {\
const char *errstr = gai_strerror (x);\
diff --git a/src/i_api.hpp b/src/i_api.hpp
index a87e41d..36afcea 100644
--- a/src/i_api.hpp
+++ b/src/i_api.hpp
@@ -1,20 +1,20 @@
/*
-Copyright (c) 2007-2009 FastMQ Inc.
-
-This file is part of 0MQ.
-
-0MQ is free software; you can redistribute it and/or modify it under
-the terms of the Lesser GNU General Public License as published by
-the Free Software Foundation; either version 3 of the License, or
-(at your option) any later version.
-
-0MQ is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-Lesser GNU General Public License for more details.
-
-You should have received a copy of the Lesser GNU General Public License
-along with this program. If not, see <http://www.gnu.org/licenses/>.
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_API_HPP_INCLUDED__
@@ -25,6 +25,8 @@ namespace zmq
struct i_api
{
+ virtual ~i_api () {}
+
virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0;
virtual int connect (const char *addr_, struct zmq_opts *opts_) = 0;
virtual int subscribe (const char *criteria_) = 0;
diff --git a/src/io_object.cpp b/src/io_object.cpp
new file mode 100644
index 0000000..41e4717
--- /dev/null
+++ b/src/io_object.cpp
@@ -0,0 +1,41 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "io_object.hpp"
+
+zmq::io_object_t::io_object_t (object_t *parent_, object_t *owner_) :
+ object_t (parent_),
+ owner (owner_)
+{
+}
+
+zmq::io_object_t::~io_object_t ()
+{
+}
+
+void zmq::io_object_t::term ()
+{
+ send_term_req (owner, this);
+}
+
+void zmq::io_object_t::process_term ()
+{
+ send_term_ack (owner);
+ delete this;
+}
diff --git a/src/io_object.hpp b/src/io_object.hpp
new file mode 100644
index 0000000..5ed1830
--- /dev/null
+++ b/src/io_object.hpp
@@ -0,0 +1,62 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__
+#define __ZMQ_IO_OBJECT_HPP_INCLUDED__
+
+#include "object.hpp"
+
+namespace zmq
+{
+
+ class io_object_t : public object_t
+ {
+ public:
+
+ // I/O object will live in the thread inherited from the parent.
+ // However, it's lifetime is managed by the owner.
+ io_object_t (object_t *parent_, object_t *owner_);
+
+ protected:
+
+ // Ask owner socket to terminate this I/O object. This may not happen
+ void term ();
+
+ // I/O object destroys itself. No point in allowing others to invoke
+ // the destructor. At the same time, it has to be virtual so that
+ // generic io_object deallocation mechanism destroys specific type
+ // of I/O object correctly.
+ virtual ~io_object_t ();
+
+ private:
+
+ // Handlers for incoming commands.
+ void process_term ();
+
+ // Socket owning this I/O object. It is responsible for destroying
+ // it when it's being closed.
+ object_t *owner;
+
+ io_object_t (const io_object_t&);
+ void operator = (const io_object_t&);
+ };
+
+}
+
+#endif
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index f5261a6..1d85292 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -29,11 +29,11 @@
#include "select.hpp"
#include "devpoll.hpp"
#include "kqueue.hpp"
-#include "context.hpp"
+#include "dispatcher.hpp"
#include "simple_semaphore.hpp"
-zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) :
- object_t (context_, thread_slot_)
+zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+ object_t (dispatcher_, thread_slot_)
{
#if defined ZMQ_FORCE_SELECT
poller = new select_t;
@@ -115,7 +115,7 @@ void zmq::io_thread_t::in_event ()
// Read all the commands from particular thread.
command_t cmd;
- while (context->read (source_thread_slot, thread_slot, &cmd))
+ while (dispatcher->read (source_thread_slot, thread_slot, &cmd))
cmd.destination->process_command (cmd);
}
}
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index 43ee19e..6f25627 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -37,7 +37,7 @@ namespace zmq
{
public:
- io_thread_t (class context_t *context_, int thread_slot_);
+ io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
diff --git a/src/mutex.hpp b/src/mutex.hpp
index 9b51955..e233c9e 100644
--- a/src/mutex.hpp
+++ b/src/mutex.hpp
@@ -72,43 +72,47 @@ namespace zmq
namespace zmq
{
-
+
class mutex_t
{
public:
inline mutex_t ()
{
int rc = pthread_mutex_init (&mutex, NULL);
- errno_assert (rc == 0);
+ if (rc)
+ posix_assert (rc);
}
-
+
inline ~mutex_t ()
{
int rc = pthread_mutex_destroy (&mutex);
- errno_assert (rc == 0);
+ if (rc)
+ posix_assert (rc);
}
-
+
inline void lock ()
{
int rc = pthread_mutex_lock (&mutex);
- errno_assert (rc == 0);
+ if (rc)
+ posix_assert (rc);
}
-
+
inline void unlock ()
{
int rc = pthread_mutex_unlock (&mutex);
- errno_assert (rc == 0);
+ if (rc)
+ posix_assert (rc);
}
-
+
private:
-
+
pthread_mutex_t mutex;
-
- // Disable copy construction and assignment.
+
+ // Disable copy construction and assignment.
mutex_t (const mutex_t&);
void operator = (const mutex_t&);
};
-
+
}
#endif
diff --git a/src/object.cpp b/src/object.cpp
index 36f3937..e2267d6 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -18,19 +18,19 @@
*/
#include "object.hpp"
-#include "context.hpp"
+#include "dispatcher.hpp"
#include "err.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
-zmq::object_t::object_t (context_t *context_, int thread_slot_) :
- context (context_),
+zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
+ dispatcher (dispatcher_),
thread_slot (thread_slot_)
{
}
zmq::object_t::object_t (object_t *parent_) :
- context (parent_->context),
+ dispatcher (parent_->dispatcher),
thread_slot (parent_->thread_slot)
{
}
@@ -41,7 +41,7 @@ zmq::object_t::~object_t ()
int zmq::object_t::thread_slot_count ()
{
- return context->thread_slot_count ();
+ return dispatcher->thread_slot_count ();
}
int zmq::object_t::get_thread_slot ()
@@ -53,45 +53,32 @@ void zmq::object_t::process_command (command_t &cmd_)
{
switch (cmd_.type) {
- case command_t::head:
- process_head (cmd_.args.head.bytes);
+ case command_t::stop:
+ process_stop ();
break;
- case command_t::tail:
- process_tail (cmd_.args.tail.bytes);
- break;
+ case command_t::plug:
+ process_plug ();
+ return;
- case command_t::engine:
- process_engine (cmd_.args.engine.engine);
- break;
+ case command_t::own:
+ process_own (cmd_.args.own.object);
+ return;
case command_t::bind:
- process_bind (cmd_.args.bind.reader, cmd_.args.bind.peer);
- break;
-
- case command_t::reg:
- process_reg (cmd_.args.reg.smph);
- break;
-
- case command_t::reg_and_bind:
- process_reg_and_bind (cmd_.args.reg_and_bind.peer,
- cmd_.args.reg_and_bind.flow_in, cmd_.args.reg_and_bind.flow_out);
- break;
-
- case command_t::unreg:
- process_unreg (cmd_.args.unreg.smph);
- break;
-
- case command_t::terminate:
- process_terminate ();
- break;
+ process_bind ();
+ return;
- case command_t::terminate_ack:
- process_terminate_ack ();
- break;
+ case command_t::term_req:
+ process_term_req (cmd_.args.term_req.object);
+ return;
+
+ case command_t::term:
+ process_term ();
+ return;
- case command_t::stop:
- process_stop ();
+ case command_t::term_ack:
+ process_term_ack ();
return;
default:
@@ -101,7 +88,7 @@ void zmq::object_t::process_command (command_t &cmd_)
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
- return context->choose_io_thread (taskset_);
+ return dispatcher->choose_io_thread (taskset_);
}
void zmq::object_t::send_stop ()
@@ -111,91 +98,56 @@ void zmq::object_t::send_stop ()
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
- context->write (thread_slot, thread_slot, cmd);
+ dispatcher->write (thread_slot, thread_slot, cmd);
}
-void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_,
- session_t *peer_)
+void zmq::object_t::send_plug (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::bind;
- cmd.args.bind.reader = reader_;
- cmd.args.bind.peer = peer_;
+ cmd.type = command_t::plug;
send_command (cmd);
}
-void zmq::object_t::send_head (object_t *destination_, uint64_t bytes_)
+void zmq::object_t::send_own (object_t *destination_, object_t *object_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::head;
- cmd.args.head.bytes = bytes_;
+ cmd.type = command_t::own;
+ cmd.args.own.object = object_;
send_command (cmd);
}
-void zmq::object_t::send_tail (object_t *destination_, uint64_t bytes_)
+void zmq::object_t::send_bind (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::tail;
- cmd.args.tail.bytes = bytes_;
- send_command (cmd);
-}
-
-void zmq::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_)
-{
- command_t cmd;
- cmd.destination = destination_;
- cmd.type = command_t::reg;
- cmd.args.reg.smph = smph_;
- send_command (cmd);
-}
-
-void zmq::object_t::send_reg_and_bind (object_t *destination_,
- session_t *peer_, bool flow_in_, bool flow_out_)
-{
- command_t cmd;
- cmd.destination = destination_;
- cmd.type = command_t::reg_and_bind;
- cmd.args.reg_and_bind.peer = peer_;
- cmd.args.reg_and_bind.flow_in = flow_in_;
- cmd.args.reg_and_bind.flow_out = flow_out_;
- send_command (cmd);
-}
-
-void zmq::object_t::send_unreg (object_t *destination_,
- simple_semaphore_t *smph_)
-{
- command_t cmd;
- cmd.destination = destination_;
- cmd.type = command_t::unreg;
- cmd.args.unreg.smph = smph_;
+ cmd.type = command_t::bind;
send_command (cmd);
}
-void zmq::object_t::send_engine (object_t *destination_, i_engine *engine_)
+void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::engine;
- cmd.args.engine.engine = engine_;
+ cmd.type = command_t::term_req;
+ cmd.args.term_req.object = object_;
send_command (cmd);
}
-void zmq::object_t::send_terminate (object_t *destination_)
+void zmq::object_t::send_term (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::terminate;
+ cmd.type = command_t::term;
send_command (cmd);
}
-void zmq::object_t::send_terminate_ack (object_t *destination_)
+void zmq::object_t::send_term_ack (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::terminate_ack;
+ cmd.type = command_t::term_ack;
send_command (cmd);
}
@@ -204,48 +156,32 @@ void zmq::object_t::process_stop ()
zmq_assert (false);
}
-void zmq::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_)
-{
- zmq_assert (false);
-}
-
-void zmq::object_t::process_head (uint64_t bytes_)
-{
- zmq_assert (false);
-}
-
-void zmq::object_t::process_tail (uint64_t bytes_)
-{
- zmq_assert (false);
-}
-
-void zmq::object_t::process_reg (simple_semaphore_t *smph_)
+void zmq::object_t::process_plug ()
{
zmq_assert (false);
}
-void zmq::object_t::process_reg_and_bind (session_t *session_,
- bool flow_in_, bool flow_out_)
+void zmq::object_t::process_own (object_t *object_)
{
zmq_assert (false);
}
-void zmq::object_t::process_unreg (simple_semaphore_t *smph_)
+void zmq::object_t::process_bind ()
{
zmq_assert (false);
}
-void zmq::object_t::process_engine (i_engine *engine_)
+void zmq::object_t::process_term_req (object_t *object_)
{
zmq_assert (false);
}
-void zmq::object_t::process_terminate ()
+void zmq::object_t::process_term ()
{
zmq_assert (false);
}
-void zmq::object_t::process_terminate_ack ()
+void zmq::object_t::process_term_ack ()
{
zmq_assert (false);
}
@@ -256,6 +192,6 @@ void zmq::object_t::send_command (command_t &cmd_)
if (destination_thread_slot == thread_slot)
cmd_.destination->process_command (cmd_);
else
- context->write (thread_slot, destination_thread_slot, cmd_);
+ dispatcher->write (thread_slot, destination_thread_slot, cmd_);
}
diff --git a/src/object.hpp b/src/object.hpp
index 5851c68..7357549 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -32,7 +32,7 @@ namespace zmq
{
public:
- object_t (class context_t *context_, int thread_slot_);
+ object_t (class dispatcher_t *dispatcher_, int thread_slot_);
object_t (object_t *parent_);
~object_t ();
@@ -42,44 +42,32 @@ namespace zmq
protected:
// Derived object can use following functions to interact with
- // global repositories. See context.hpp for function details.
+ // global repositories. See dispatcher.hpp for function details.
int thread_slot_count ();
class io_thread_t *choose_io_thread (uint64_t taskset_);
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
- void send_bind (object_t *destination_, class pipe_reader_t *reader_,
- class session_t *peer_);
- void send_head (object_t *destination_, uint64_t bytes_);
- void send_tail (object_t *destination_, uint64_t bytes_);
- void send_reg (object_t *destination_,
- class simple_semaphore_t *smph_);
- void send_reg_and_bind (object_t *destination_, class session_t *peer_,
- bool flow_in_, bool flow_out_);
- void send_unreg (object_t *destination_,
- class simple_semaphore_t *smph_);
- void send_engine (object_t *destination_, struct i_engine *engine_);
- void send_terminate (object_t *destination_);
- void send_terminate_ack (object_t *destination_);
+ void send_plug (object_t *destination_);
+ void send_own (object_t *destination_, object_t *object_);
+ void send_bind (object_t *destination_);
+ void send_term_req (object_t *destination_, object_t *object_);
+ void send_term (object_t *destination_);
+ void send_term_ack (object_t *destination_);
// These handlers can be overloaded by the derived objects. They are
// called when command arrives from another thread.
virtual void process_stop ();
- virtual void process_bind (class pipe_reader_t *reader_,
- class session_t *peer_);
- virtual void process_head (uint64_t bytes_);
- virtual void process_tail (uint64_t bytes_);
- virtual void process_reg (class simple_semaphore_t *smph_);
- virtual void process_reg_and_bind (class session_t *peer_,
- bool flow_in_, bool flow_out_);
- virtual void process_unreg (class simple_semaphore_t *smph_);
- virtual void process_engine (struct i_engine *engine_);
- virtual void process_terminate ();
- virtual void process_terminate_ack ();
+ virtual void process_plug ();
+ virtual void process_own (object_t *object_);
+ virtual void process_bind ();
+ virtual void process_term_req (object_t *object_);
+ virtual void process_term ();
+ virtual void process_term_ack ();
// Pointer to the root of the infrastructure.
- class context_t *context;
+ class dispatcher_t *dispatcher;
// Slot ID of the thread the object belongs to.
int thread_slot;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
new file mode 100644
index 0000000..3737410
--- /dev/null
+++ b/src/socket_base.cpp
@@ -0,0 +1,129 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <algorithm>
+
+#include "../include/zmq.h"
+
+#include "socket_base.hpp"
+#include "app_thread.hpp"
+#include "err.hpp"
+#include "zmq_listener.hpp"
+#include "io_thread.hpp"
+
+zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
+ object_t (parent_),
+ pending_term_acks (0),
+ app_thread (parent_)
+{
+}
+
+zmq::socket_base_t::~socket_base_t ()
+{
+ while (true) {
+
+ // On third pass of the loop there should be no more I/O objects
+ // because all connecters and listerners were destroyed during
+ // the first pass and all engines delivered by delayed 'own' commands
+ // are destroyed during the second pass.
+ if (io_objects.empty () && !pending_term_acks)
+ break;
+
+ // Send termination request to all associated I/O objects.
+ for (io_objects_t::size_type i = 0; i != io_objects.size (); i++)
+ send_term (io_objects [i]);
+
+ // Move the objects to the list of pending term acks.
+ pending_term_acks += io_objects.size ();
+ io_objects.clear ();
+
+ // Process commands till we get all the termination acknowledgements.
+ while (pending_term_acks)
+ app_thread->process_commands (true);
+ }
+}
+
+int zmq::socket_base_t::bind (const char *addr_, struct zmq_opts *opts_)
+{
+ uint64_t taskset = opts_ ? opts_->taskset : 0;
+ object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this);
+ send_plug (listener);
+ send_own (this, listener);
+ return 0;
+}
+
+int zmq::socket_base_t::connect (const char *addr_, struct zmq_opts *opts_)
+{
+ zmq_assert (false);
+}
+
+int zmq::socket_base_t::subscribe (const char *criteria_)
+{
+ zmq_assert (false);
+}
+
+int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+int zmq::socket_base_t::flush ()
+{
+ zmq_assert (false);
+}
+
+int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+int zmq::socket_base_t::close ()
+{
+ app_thread->remove_socket (this);
+ delete this;
+ return 0;
+}
+
+void zmq::socket_base_t::process_own (object_t *object_)
+{
+ io_objects.push_back (object_);
+}
+
+void zmq::socket_base_t::process_term_req (object_t *object_)
+{
+ // If I/O object is well and alive ask it to terminate.
+ // TODO: Following find may produce an unacceptable jitter in
+ // C10K-style applications. If so, use set instead of vector.
+ io_objects_t::iterator it = std::find (io_objects.begin (),
+ io_objects.end (), object_);
+ if (it != io_objects.end ()) {
+ pending_term_acks++;
+ io_objects.erase (it);
+ send_term (object_);
+ }
+
+ // If not found, we assume that termination request was already sent to
+ // the object so we can sagely ignore the request.
+}
+
+void zmq::socket_base_t::process_term_ack ()
+{
+ zmq_assert (pending_term_acks);
+ pending_term_acks--;
+}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
new file mode 100644
index 0000000..633f003
--- /dev/null
+++ b/src/socket_base.hpp
@@ -0,0 +1,72 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
+#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
+
+#include <vector>
+
+#include "i_api.hpp"
+#include "object.hpp"
+
+namespace zmq
+{
+
+ class socket_base_t : public object_t, public i_api
+ {
+ public:
+
+ socket_base_t (class app_thread_t *parent_);
+ ~socket_base_t ();
+
+ // i_api interface implementation.
+ int bind (const char *addr_, struct zmq_opts *opts_);
+ int connect (const char *addr_, struct zmq_opts *opts_);
+ int subscribe (const char *criteria_);
+ int send (struct zmq_msg *msg_, int flags_);
+ int flush ();
+ int recv (struct zmq_msg *msg_, int flags_);
+ int close ();
+
+ private:
+
+ // Handlers for incoming commands.
+ void process_own (object_t *object_);
+ void process_term_req (object_t *object_);
+ void process_term_ack ();
+
+ // List of all I/O objects owned by this socket. The socket is
+ // responsible for deallocating them before it quits.
+ typedef std::vector <object_t*> io_objects_t;
+ io_objects_t io_objects;
+
+ // Number of I/O objects that were already asked to terminate
+ // but haven't acknowledged it yet.
+ int pending_term_acks;
+
+ // Application thread the socket lives in.
+ class app_thread_t *app_thread;
+
+ socket_base_t (const socket_base_t&);
+ void operator = (const socket_base_t&);
+ };
+
+}
+
+#endif
diff --git a/src/zmq.cpp b/src/zmq.cpp
index d19b229..149a7e2 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -25,7 +25,7 @@
#include "i_api.hpp"
#include "err.hpp"
-#include "context.hpp"
+#include "dispatcher.hpp"
#include "msg.hpp"
int zmq_msg_init (zmq_msg *msg_)
@@ -162,27 +162,28 @@ int zmq_msg_type (zmq_msg *msg_)
void *zmq_init (int app_threads_, int io_threads_)
{
- // There should be at least a single thread managed by the context.
+ // There should be at least a single thread managed by the dispatcher.
if (app_threads_ < 0 || io_threads_ < 0 ||
app_threads_ + io_threads_ == 0) {
errno = EINVAL;
return NULL;
}
- zmq::context_t *context = new zmq::context_t (app_threads_, io_threads_);
- zmq_assert (context);
- return (void*) context;
+ zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_,
+ io_threads_);
+ zmq_assert (dispatcher);
+ return (void*) dispatcher;
}
-int zmq_term (void *context_)
+int zmq_term (void *dispatcher_)
{
- delete (zmq::context_t*) context_;
+ delete (zmq::dispatcher_t*) dispatcher_;
return 0;
}
-void *zmq_socket (void *context_, int type_)
+void *zmq_socket (void *dispatcher_, int type_)
{
- return (void*) (((zmq::context_t*) context_)->create_socket (type_));
+ return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_));
}
int zmq_close (void *s_)
diff --git a/src/i_socket.hpp b/src/zmq_listener.cpp
index 99ade8a..1f1e012 100644
--- a/src/i_socket.hpp
+++ b/src/zmq_listener.cpp
@@ -17,20 +17,19 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_I_SOCKET_HPP_INCLUDED__
-#define __ZMQ_I_SOCKET_HPP_INCLUDED__
+#include "zmq_listener.hpp"
+#include "err.hpp"
-namespace zmq
+zmq::zmq_listener_t::zmq_listener_t (object_t *parent_, object_t *owner_) :
+ io_object_t (parent_, owner_)
{
+}
- struct i_socket
- {
- virtual ~i_socket () {};
-
- // Start shutting down the socket.
- virtual void stop () = 0;
- };
-
+zmq::zmq_listener_t::~zmq_listener_t ()
+{
}
-#endif
+void zmq::zmq_listener_t::process_plug ()
+{
+ // TODO: Register with the I/O thread here.
+}
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
new file mode 100644
index 0000000..12192b2
--- /dev/null
+++ b/src/zmq_listener.hpp
@@ -0,0 +1,46 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
+#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
+
+#include "io_object.hpp"
+
+namespace zmq
+{
+
+ class zmq_listener_t : public io_object_t
+ {
+ public:
+
+ zmq_listener_t (object_t *parent_, object_t *owner_);
+ ~zmq_listener_t ();
+
+ private:
+
+ // Handlers for incoming commands.
+ void process_plug ();
+
+ zmq_listener_t (const zmq_listener_t&);
+ void operator = (const zmq_listener_t&);
+ };
+
+}
+
+#endif