summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-06 12:51:32 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-06 12:51:32 +0200
commit0b5cc026fbe7ccc6de66907be29471562a2d344d (patch)
treea6051f238152e2261ea48942f0c216a3984cc9fd
parentb8b4acef4c2ba1a169ce84c1fb4c70a5676ebba3 (diff)
clean up - session/socket/engine stuff removed
-rw-r--r--src/Makefile.am51
-rw-r--r--src/app_thread.cpp106
-rw-r--r--src/app_thread.hpp27
-rw-r--r--src/connecter.cpp189
-rw-r--r--src/connecter.hpp99
-rw-r--r--src/context.cpp139
-rw-r--r--src/context.hpp52
-rw-r--r--src/data_distributor.cpp155
-rw-r--r--src/data_distributor.hpp70
-rw-r--r--src/devpoll.cpp9
-rw-r--r--src/devpoll.hpp3
-rw-r--r--src/dummy_aggregator.cpp111
-rw-r--r--src/dummy_aggregator.hpp73
-rw-r--r--src/dummy_distributor.cpp85
-rw-r--r--src/dummy_distributor.hpp68
-rw-r--r--src/epoll.cpp10
-rw-r--r--src/epoll.hpp3
-rw-r--r--src/fair_aggregator.cpp143
-rw-r--r--src/fair_aggregator.hpp77
-rw-r--r--src/i_api.hpp42
-rw-r--r--src/i_demux.hpp57
-rw-r--r--src/i_engine.hpp53
-rw-r--r--src/i_mux.hpp60
-rw-r--r--src/i_poller.hpp7
-rw-r--r--src/i_session.hpp37
-rw-r--r--src/i_socket.hpp (renamed from src/req.cpp)21
-rw-r--r--src/i_thread.hpp38
-rw-r--r--src/io_object.cpp37
-rw-r--r--src/io_object.hpp51
-rw-r--r--src/io_thread.cpp31
-rw-r--r--src/io_thread.hpp27
-rw-r--r--src/kqueue.cpp9
-rw-r--r--src/kqueue.hpp3
-rw-r--r--src/listener.cpp170
-rw-r--r--src/listener.hpp110
-rw-r--r--src/load_balancer.cpp130
-rw-r--r--src/load_balancer.hpp73
-rw-r--r--src/object.cpp33
-rw-r--r--src/object.hpp8
-rw-r--r--src/p2p.cpp29
-rw-r--r--src/p2p.hpp42
-rw-r--r--src/pipe.cpp47
-rw-r--r--src/pipe.hpp23
-rw-r--r--src/pipe_reader.cpp118
-rw-r--r--src/pipe_reader.hpp89
-rw-r--r--src/pipe_writer.cpp120
-rw-r--r--src/pipe_writer.hpp88
-rw-r--r--src/poll.cpp13
-rw-r--r--src/poll.hpp3
-rw-r--r--src/pub.cpp38
-rw-r--r--src/pub.hpp45
-rw-r--r--src/rep.cpp29
-rw-r--r--src/rep.hpp42
-rw-r--r--src/req.hpp42
-rw-r--r--src/safe_object.cpp76
-rw-r--r--src/safe_object.hpp68
-rw-r--r--src/select.cpp13
-rw-r--r--src/select.hpp2
-rw-r--r--src/session.cpp273
-rw-r--r--src/session.hpp107
-rw-r--r--src/session_stub.cpp110
-rw-r--r--src/session_stub.hpp83
-rw-r--r--src/socket_base.cpp267
-rw-r--r--src/socket_base.hpp96
-rw-r--r--src/sub.cpp45
-rw-r--r--src/sub.hpp46
-rw-r--r--src/zmq.cpp2
-rw-r--r--src/zmq_decoder.cpp79
-rw-r--r--src/zmq_decoder.hpp57
-rw-r--r--src/zmq_encoder.cpp75
-rw-r--r--src/zmq_encoder.hpp54
-rw-r--r--src/zmq_tcp_engine.cpp185
-rw-r--r--src/zmq_tcp_engine.hpp92
73 files changed, 109 insertions, 4856 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 27f4412..bde9c39 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -7,53 +7,30 @@ libzmq_la_SOURCES = \
atomic_ptr.hpp \
command.hpp \
config.hpp \
- connecter.hpp \
context.hpp \
- data_distributor.hpp \
decoder.hpp \
devpoll.hpp \
- dummy_aggregator.hpp \
- dummy_distributor.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
- fair_aggregator.hpp \
fd.hpp \
fd_signaler.hpp \
- io_object.hpp \
io_thread.hpp \
ip.hpp \
i_api.hpp \
- i_demux.hpp \
- i_mux.hpp \
i_poller.hpp \
i_poll_events.hpp \
- i_session.hpp \
i_signaler.hpp \
- i_engine.hpp \
- i_thread.hpp \
- listener.hpp \
+ i_socket.hpp \
kqueue.hpp \
- load_balancer.hpp \
msg.hpp \
mutex.hpp \
object.hpp \
- p2p.hpp \
pipe.hpp \
- pipe_reader.hpp \
- pipe_writer.hpp \
platform.hpp \
poll.hpp \
- pub.hpp \
- rep.hpp \
- req.hpp \
- safe_object.hpp \
select.hpp \
- session.hpp \
- session_stub.hpp \
simple_semaphore.hpp \
- socket_base.hpp \
- sub.hpp \
stdint.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
@@ -65,50 +42,24 @@ libzmq_la_SOURCES = \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
- zmq_decoder.hpp \
- zmq_encoder.hpp \
- zmq_tcp_engine.hpp \
app_thread.cpp \
- connecter.cpp \
context.cpp \
- data_distributor.cpp \
devpoll.hpp \
- dummy_aggregator.cpp \
- dummy_distributor.cpp \
epoll.cpp \
err.cpp \
- fair_aggregator.cpp \
fd_signaler.cpp \
- io_object.cpp \
io_thread.cpp \
ip.cpp \
kqueue.cpp \
- listener.cpp \
- load_balancer.cpp \
object.cpp \
- p2p.cpp \
- pipe.cpp \
- pipe_reader.cpp \
- pipe_writer.cpp \
poll.cpp \
- pub.cpp \
- rep.cpp \
- req.cpp \
- safe_object.cpp \
select.cpp \
- session.cpp \
- session_stub.cpp \
- socket_base.cpp \
- sub.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
uuid.cpp \
ypollset.cpp \
- zmq_decoder.cpp \
- zmq_encoder.cpp \
- zmq_tcp_engine.cpp \
zmq.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 9cc61c7..23a055a 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -28,20 +28,8 @@
#include "app_thread.hpp"
#include "context.hpp"
#include "err.hpp"
-#include "session.hpp"
#include "pipe.hpp"
#include "config.hpp"
-#include "i_api.hpp"
-#include "dummy_aggregator.hpp"
-#include "fair_aggregator.hpp"
-#include "dummy_distributor.hpp"
-#include "data_distributor.hpp"
-#include "load_balancer.hpp"
-#include "p2p.hpp"
-#include "pub.hpp"
-#include "sub.hpp"
-#include "req.hpp"
-#include "rep.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
{
}
-void zmq::app_thread_t::shutdown ()
-{
- // Deallocate all the sessions associated with the thread.
- while (!sessions.empty ())
- sessions [0]->shutdown ();
-
- delete this;
-}
-
zmq::app_thread_t::~app_thread_t ()
{
-}
+ // Ask all the sockets to start termination, then wait till it is complete.
+ for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
+ (*it)->stop ();
+ for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
+ delete *it;
-void zmq::app_thread_t::attach_session (session_t *session_)
-{
- session_->set_index (sessions.size ());
- sessions.push_back (session_);
-}
-
-void zmq::app_thread_t::detach_session (session_t *session_)
-{
- // O(1) removal of the session from the list.
- sessions_t::size_type i = session_->get_index ();
- sessions [i] = sessions [sessions.size () - 1];
- sessions [i]->set_index (i);
- sessions.pop_back ();
-}
-
-zmq::i_poller *zmq::app_thread_t::get_poller ()
-{
- zmq_assert (false);
+ delete this;
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
@@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler ()
bool zmq::app_thread_t::is_current ()
{
- return !sessions.empty () && tid == getpid ();
+ return !sockets.empty () && tid == getpid ();
}
bool zmq::app_thread_t::make_current ()
{
// If there are object managed by this slot we cannot assign the slot
// to a different thread.
- if (!sessions.empty ())
+ if (!sockets.empty ())
return false;
tid = getpid ();
return true;
}
-zmq::i_api *zmq::app_thread_t::create_socket (int type_)
-{
- i_mux *mux = NULL;
- i_demux *demux = NULL;
- session_t *session = NULL;
- i_api *api = NULL;
-
- switch (type_) {
- case ZMQ_P2P:
- mux = new dummy_aggregator_t;
- zmq_assert (mux);
- demux = new dummy_distributor_t;
- zmq_assert (demux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new p2p_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_PUB:
- demux = new data_distributor_t;
- zmq_assert (demux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new pub_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_SUB:
- mux = new fair_aggregator_t;
- zmq_assert (mux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new sub_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_REQ:
- // TODO
- zmq_assert (false);
- api = new req_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_REP:
- // TODO
- zmq_assert (false);
- api = new rep_t (this, session);
- zmq_assert (api);
- break;
- default:
- errno = EINVAL;
- return NULL;
- }
-
- attach_session (session);
-
- return api;
-}
-
void zmq::app_thread_t::process_commands (bool block_)
{
ypollset_t::signals_t signals;
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 8295c2f..31679b8 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -22,7 +22,7 @@
#include <vector>
-#include "i_thread.hpp"
+#include "i_socket.hpp"
#include "stdint.hpp"
#include "object.hpp"
#include "ypollset.hpp"
@@ -30,23 +30,18 @@
namespace zmq
{
- class app_thread_t : public object_t, public i_thread
+ class app_thread_t : public object_t
{
public:
app_thread_t (class context_t *context_, int thread_slot_);
- // To be called when the whole infrastrucure is being closed.
- void shutdown ();
+ ~app_thread_t ();
// Returns signaler associated with this application thread.
i_signaler *get_signaler ();
- // Create socket engine in this thread. Return false if the calling
- // thread doesn't match the thread handled by this app thread object.
- struct i_api *create_socket (int type_);
-
- // Nota bene: The following two functions are accessed from different
+ // Nota bene: Following two functions are accessed from different
// threads. The caller (context) is responsible for synchronisation
// of accesses.
@@ -61,25 +56,17 @@ namespace zmq
// set to true, returns only after at least one command was processed.
void process_commands (bool block_);
- // i_thread implementation.
- void attach_session (class session_t *session_);
- void detach_session (class session_t *session_);
- struct i_poller *get_poller ();
-
private:
- // Clean-up.
- ~app_thread_t ();
+ // All the sockets created from this application thread.
+ typedef std::vector <i_socket*> sockets_t;
+ sockets_t sockets;
// Thread ID associated with this slot.
// TODO: Virtualise pid_t!
// TODO: Check whether getpid returns unique ID for each thread.
int tid;
- // Vector of all sessionss associated with this app thread.
- typedef std::vector <class session_t*> sessions_t;
- sessions_t sessions;
-
// App thread's signaler object.
ypollset_t pollset;
diff --git a/src/connecter.cpp b/src/connecter.cpp
deleted file mode 100644
index 970dcf7..0000000
--- a/src/connecter.cpp
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "connecter.hpp"
-#include "io_thread.hpp"
-#include "session.hpp"
-#include "err.hpp"
-#include "simple_semaphore.hpp"
-#include "zmq_tcp_engine.hpp"
-
-zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_,
- session_t *session_) :
- io_object_t (thread_),
- state (idle),
- poller (NULL),
- session (session_),
- addr (addr_),
- identity ("abcde"),
- engine (NULL)
-{
-}
-
-void zmq::connecter_t::terminate ()
-{
- delete this;
-}
-
-void zmq::connecter_t::shutdown ()
-{
- delete this;
-}
-
-zmq::connecter_t::~connecter_t ()
-{
-}
-
-void zmq::connecter_t::process_reg (simple_semaphore_t *smph_)
-{
- // Fet poller pointer for further use.
- zmq_assert (!poller);
- poller = get_poller ();
-
- // Ask the session to register itself with the I/O thread. Note that
- // the session is living in the same I/O thread, thus this results
- // in a synchronous call.
- session->inc_seqnum ();
- send_reg (session, NULL);
-
- // Unlock the application thread that created the connecter.
- if (smph_)
- smph_->post ();
-
- // Manually trigger timer event which will launch asynchronous connect.
- state = waiting;
- timer_event ();
-}
-
-void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_)
-{
- // Unregister connecter/engine from the poller.
- zmq_assert (poller);
- if (state == connecting)
- poller->rm_fd (handle);
- else if (state == waiting)
- poller->cancel_timer (this);
- else if (state == sending)
- engine->terminate ();
-
- // Unlock the application thread closing the connecter.
- if (smph_)
- smph_->post ();
-}
-
-void zmq::connecter_t::in_event ()
-{
- // Error occured in asynchronous connect. Retry to connect after a while.
- if (state == connecting) {
- fd_t fd = tcp_connecter.connect ();
- zmq_assert (fd == retired_fd);
- poller->rm_fd (handle);
- poller->add_timer (this);
- state = waiting;
- return;
- }
-
- zmq_assert (false);
-}
-
-void zmq::connecter_t::out_event ()
-{
- if (state == connecting) {
-
- fd_t fd = tcp_connecter.connect ();
- if (fd == retired_fd) {
- poller->rm_fd (handle);
- poller->add_timer (this);
- state = waiting;
- return;
- }
-
- poller->rm_fd (handle);
- engine = new zmq_tcp_engine_t (fd);
- zmq_assert (engine);
- engine->attach (poller, this);
- state = sending;
- return;
- }
-
- zmq_assert (false);
-}
-
-void zmq::connecter_t::timer_event ()
-{
- zmq_assert (state == waiting);
-
- // Initiate async connect and start polling for its completion. If async
- // connect fails instantly, try to reconnect after a while.
- int rc = tcp_connecter.open (addr.c_str ());
- if (rc == 0) {
- state = connecting;
- in_event ();
- }
- else if (rc == 1) {
- handle = poller->add_fd (tcp_connecter.get_fd (), this);
- poller->set_pollout (handle);
- state = connecting;
- }
- else {
- poller->add_timer (this);
- state = waiting;
- }
-}
-
-void zmq::connecter_t::set_engine (struct i_engine *engine_)
-{
- engine = engine_;
-}
-
-bool zmq::connecter_t::read (zmq_msg *msg_)
-{
- zmq_assert (state == sending);
-
- // Deallocate old content of the message just in case.
- zmq_msg_close (msg_);
-
- // Send the identity.
- zmq_msg_init_size (msg_, identity.size ());
- memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ());
-
- // Ask engine to unregister from the poller.
- i_engine *e = engine;
- engine->detach ();
-
- // Attach the engine to the session. (Note that this is actually
- // a synchronous call.
- session->inc_seqnum ();
- send_engine (session, e);
-
- state = idle;
-
- return true;
-}
-
-bool zmq::connecter_t::write (struct zmq_msg *msg_)
-{
- // No incoming messages are accepted till identity is sent.
- return false;
-}
-
-void zmq::connecter_t::flush ()
-{
- // No incoming messages are accepted till identity is sent.
-}
diff --git a/src/connecter.hpp b/src/connecter.hpp
deleted file mode 100644
index 1f11c63..0000000
--- a/src/connecter.hpp
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__
-#define __ZMQ_CONNECTER_HPP_INCLUDED__
-
-#include <string>
-
-#include "../include/zmq.h"
-
-#include "i_poller.hpp"
-#include "io_object.hpp"
-#include "i_poll_events.hpp"
-#include "i_session.hpp"
-#include "tcp_connecter.hpp"
-
-namespace zmq
-{
-
- class connecter_t : public io_object_t, public i_poll_events,
- public i_session
- {
- public:
-
- connecter_t (class io_thread_t *thread_, const char *addr_,
- class session_t *session_);
-
- void terminate ();
- void shutdown ();
-
- void process_reg (class simple_semaphore_t *smph_);
- void process_unreg (class simple_semaphore_t *smph_);
-
- // i_poll_events implementation.
- void in_event ();
- void out_event ();
- void timer_event ();
-
- // i_session implementation
- void set_engine (struct i_engine *engine_);
- // void shutdown ();
- bool read (struct zmq_msg *msg_);
- bool write (struct zmq_msg *msg_);
- void flush ();
-
- private:
-
- // Clean-up.
- ~connecter_t ();
-
- enum {
- idle,
- waiting,
- connecting,
- sending
- } state;
-
- // Cached pointer to the poller.
- struct i_poller *poller;
-
- // Handle of the connecting socket.
- handle_t handle;
-
- // Associated session. It lives in the same I/O thread.
- class session_t *session;
-
- // Address to connect to.
- std::string addr;
-
- // Identity of the connection.
- std::string identity;
-
- tcp_connecter_t tcp_connecter;
-
- struct i_engine *engine;
-
- connecter_t (const connecter_t&);
- void operator = (const connecter_t&);
- };
-
-}
-
-#endif
diff --git a/src/context.cpp b/src/context.cpp
index ab4643e..6b071cf 100644
--- a/src/context.cpp
+++ b/src/context.cpp
@@ -20,15 +20,12 @@
#include "../include/zmq.h"
#include "context.hpp"
+#include "i_api.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "pipe.hpp"
-#include "pipe_reader.hpp"
-#include "pipe_writer.hpp"
-#include "session.hpp"
-#include "i_api.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.h"
@@ -72,37 +69,23 @@ zmq::context_t::context_t (int app_threads_, int io_threads_)
io_threads [i]->start ();
}
-void zmq::context_t::shutdown ()
-{
- delete this;
-}
-
zmq::context_t::~context_t ()
{
- // Ask I/O threads to terminate.
+ // Close all application theads, sockets, io_objects etc.
+ for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
+ delete app_threads [i];
+
+ // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
+ // thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->stop ();
// Wait till I/O threads actually terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
- io_threads [i]->join ();
-
- // At this point the current thread is the only thread with access to
- // our internal data. Deallocation will be done exclusively in this thread.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- app_threads [i]->shutdown ();
- for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
- io_threads [i]->shutdown ();
+ delete io_threads [i];
delete [] command_pipes;
- // Deallocate all the pipes, pipe readers and pipe writers.
- for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) {
- delete it->pipe;
- delete it->reader;
- delete it->writer;
- }
-
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
int rc = WSACleanup ();
@@ -123,7 +106,11 @@ zmq::i_api *zmq::context_t::create_socket (int type_)
threads_sync.unlock ();
return NULL;
}
- i_api *s = thread->create_socket (type_);
+
+ zmq_assert (false);
+ i_api *s = NULL;
+ //i_api *s = thread->create_socket (type_);
+
threads_sync.unlock ();
return s;
}
@@ -164,103 +151,3 @@ zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_)
return io_threads [result];
}
-
-void zmq::context_t::create_pipe (object_t *reader_parent_,
- object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
- pipe_reader_t **reader_, pipe_writer_t **writer_)
-{
- // Create the pipe, reader & writer triple.
- pipe_t *pipe = new pipe_t;
- zmq_assert (pipe);
- pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe,
- hwm_, lwm_);
- zmq_assert (reader);
- pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader,
- hwm_, lwm_);
- zmq_assert (writer);
- reader->set_peer (writer);
-
- // Store the pipe in the repository.
- pipe_info_t info = {pipe, reader, writer};
- pipes_sync.lock ();
- pipe->set_index (pipes.size ());
- pipes.push_back (info);
- pipes_sync.unlock ();
-
- *reader_ = reader;
- *writer_ = writer;
-}
-
-void zmq::context_t::destroy_pipe (pipe_t *pipe_)
-{
- // Remove the pipe from the repository.
- pipe_info_t info;
- pipes_sync.lock ();
- pipes_t::size_type i = pipe_->get_index ();
- info = pipes [i];
- pipes [i] = pipes.back ();
- pipes.pop_back ();
- pipes_sync.unlock ();
-
- // Deallocate the pipe and associated pipe reader & pipe writer.
- zmq_assert (info.pipe == pipe_);
- delete info.pipe;
- delete info.reader;
- delete info.writer;
-}
-
-int zmq::context_t::register_inproc_endpoint (const char *endpoint_,
- session_t *session_)
-{
- inproc_endpoint_sync.lock ();
- inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
-
- if (it != inproc_endpoints.end ()) {
- inproc_endpoint_sync.unlock ();
- errno = EADDRINUSE;
- return -1;
- }
-
- inproc_endpoints.insert (std::make_pair (endpoint_, session_));
-
- inproc_endpoint_sync.unlock ();
- return 0;
-}
-
-zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_)
-{
- inproc_endpoint_sync.lock ();
- inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
-
- if (it == inproc_endpoints.end ()) {
- inproc_endpoint_sync.unlock ();
- errno = EADDRNOTAVAIL;
- return NULL;
- }
-
- it->second->inc_seqnum ();
- object_t *session = it->second;
-
- inproc_endpoint_sync.unlock ();
- return session;
-}
-
-void zmq::context_t::unregister_inproc_endpoints (session_t *session_)
-{
- inproc_endpoint_sync.lock ();
-
- // Remove the connection from the repository.
- // TODO: Yes, the algorithm has O(n^2) complexity. Should be O(log n).
- for (inproc_endpoints_t::iterator it = inproc_endpoints.begin ();
- it != inproc_endpoints.end ();) {
- if (it->second == session_) {
- inproc_endpoints.erase (it);
- it = inproc_endpoints.begin ();
- }
- else
- it++;
- }
-
- inproc_endpoint_sync.unlock ();
-}
-
diff --git a/src/context.hpp b/src/context.hpp
index 7701ef7..f2eab1c 100644
--- a/src/context.hpp
+++ b/src/context.hpp
@@ -52,9 +52,9 @@ namespace zmq
context_t (int app_threads_, int io_threads_);
// To be called to terminate the whole infrastructure (zmq_term).
- void shutdown ();
+ ~context_t ();
- // Create a socket engine.
+ // Create a socket.
struct i_api *create_socket (int type_);
// Returns number of thread slots in the context. To be used by
@@ -81,37 +81,12 @@ namespace zmq
destination_].read (command_);
}
- // Creates new pipe.
- void create_pipe (class object_t *reader_parent_,
- class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
- class pipe_reader_t **reader_, class pipe_writer_t **writer_);
-
- // Deallocates the pipe.
- void destroy_pipe (class pipe_t *pipe_);
-
- // Registers existing session object as an inproc endpoint.
- int register_inproc_endpoint (const char *endpoint_,
- class session_t *session_);
-
- // Retrieves an inproc endpoint. Increments the command sequence number
- // of the object by one. Caller is thus bound to send the command
- // to the connection after invoking this function. Returns NULL if
- // the endpoint doesn't exist.
- class object_t *get_inproc_endpoint (const char *endpoint_);
-
- // Removes all the inproc endpoints associated with the given session
- // object from the global repository.
- void unregister_inproc_endpoints (class session_t *session_);
-
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
private:
- // Clean-up.
- ~context_t ();
-
// Returns the app thread associated with the current thread.
// NULL if we are out of app thread slots.
class app_thread_t *choose_app_thread ();
@@ -137,29 +112,6 @@ namespace zmq
// Synchronisation of accesses to shared thread data.
mutex_t threads_sync;
- // Global repository of pipes. It's used only on terminal shutdown
- // to deallocate all the pipes irrespective of whether they are
- // referenced from pipe_reader, pipe_writer or both.
- struct pipe_info_t
- {
- class pipe_t *pipe;
- class pipe_reader_t *reader;
- class pipe_writer_t *writer;
- };
- typedef std::vector <pipe_info_t> pipes_t;
- pipes_t pipes;
-
- // Synchronisation of access to global repository of pipes.
- mutex_t pipes_sync;
-
- // Global repository of available inproc endpoints.
- typedef std::map <std::string, class session_t*> inproc_endpoints_t;
- inproc_endpoints_t inproc_endpoints;
-
- // Synchronisation of access to the global repository
- // of inproc endpoints.
- mutex_t inproc_endpoint_sync;
-
context_t (const context_t&);
void operator = (const context_t&);
};
diff --git a/src/data_distributor.cpp b/src/data_distributor.cpp
deleted file mode 100644
index 971edce..0000000
--- a/src/data_distributor.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "data_distributor.hpp"
-#include "pipe_writer.hpp"
-#include "err.hpp"
-#include "session.hpp"
-#include "msg.hpp"
-
-zmq::data_distributor_t::data_distributor_t () :
- session (NULL)
-{
-}
-
-void zmq::data_distributor_t::set_session (session_t *session_)
-{
- zmq_assert (!session);
- session = session_;
-}
-
-void zmq::data_distributor_t::shutdown ()
-{
- // No need to deallocate pipes here. They'll be deallocated during the
- // shutdown of the dispatcher.
- delete this;
-}
-
-void zmq::data_distributor_t::terminate ()
-{
- // Pipe unregisters itself during the call to terminate, so the pipes
- // list shinks by one in each iteration.
- while (!pipes.empty ())
- pipes [0]->terminate ();
-
- delete this;
-}
-
-zmq::data_distributor_t::~data_distributor_t ()
-{
-}
-
-void zmq::data_distributor_t::attach_pipe (pipe_writer_t *pipe_)
-{
- // Associate demux with a new pipe.
- pipe_->set_demux (this);
- pipe_->set_index (pipes.size ());
- pipes.push_back (pipe_);
-}
-
-void zmq::data_distributor_t::detach_pipe (pipe_writer_t *pipe_)
-{
- // Release the reference to the pipe.
- int index = pipe_->get_index ();
- pipe_->set_index (-1);
- pipes [index] = pipes.back ();
- pipes [index]->set_index (index);
- pipes.pop_back ();
-}
-
-bool zmq::data_distributor_t::empty ()
-{
- return pipes.empty ();
-}
-
-bool zmq::data_distributor_t::send (zmq_msg *msg_)
-{
- int pipes_count = pipes.size ();
-
- // If there are no pipes available, simply drop the message.
- if (pipes_count == 0) {
- zmq_msg_close (msg_);
- zmq_msg_init (msg_);
- return true;
- }
-
- // TODO: ???
- // First check whether all pipes are available for writing.
-// for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
-// if (!(*it)->check_write (msg_))
-// return false;
-
- // For VSMs the copying is straighforward.
- if (msg_->content == (zmq_msg_content*) ZMQ_VSM) {
- for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
- write_to_pipe (*it, msg_);
- zmq_msg_init (msg_);
- return true;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment (i.e. atomic
- // operations) needed.
- if (pipes_count == 1) {
- write_to_pipe (*pipes.begin (), msg_);
- zmq_msg_init (msg_);
- return true;
- }
-
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why the -1).
- if (msg_->shared)
- msg_->content->refcnt.add (pipes_count - 1);
- else {
- msg_->shared = true;
- // TODO: Add memory barrier here.
- msg_->content->refcnt.set (pipes_count);
- }
-
- // Push the message to all destinations.
- for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
- write_to_pipe (*it, msg_);
-
- // Detach the original message from the data buffer.
- zmq_msg_init (msg_);
-
- return true;
-}
-
-void zmq::data_distributor_t::flush ()
-{
- // Flush all pipes. If there's large number of pipes, it can be pretty
- // inefficient (especially if there's new message only in a single pipe).
- // Can it be improved?
- for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
- (*it)->flush ();
-}
-
-void zmq::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_,
- struct zmq_msg *msg_)
-{
- if (!pipe_->write (msg_)) {
- // TODO: Push gap notification to the pipe.
- zmq_assert (false);
- }
-}
-
diff --git a/src/data_distributor.hpp b/src/data_distributor.hpp
deleted file mode 100644
index 5bde2e8..0000000
--- a/src/data_distributor.hpp
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__
-#define __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__
-
-#include <vector>
-
-#include <i_demux.hpp>
-
-namespace zmq
-{
-
- // Object to distribute messages to outbound pipes.
-
- class data_distributor_t : public i_demux
- {
- public:
-
- data_distributor_t ();
-
- // i_demux implementation.
- void set_session (class session_t *session_);
- void shutdown ();
- void terminate ();
- void attach_pipe (class pipe_writer_t *pipe_);
- void detach_pipe (class pipe_writer_t *pipe_);
- bool empty ();
- bool send (struct zmq_msg *msg_);
- void flush ();
-
- private:
-
- // Clean-up.
- ~data_distributor_t ();
-
- // Reference to the owner session object.
- class session_t *session;
-
- // Writes the message to the pipe if possible. If it isn't, writes
- // a gap notification to the pipe.
- void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_);
-
- // The list of outbound pipes.
- typedef std::vector <class pipe_writer_t*> pipes_t;
- pipes_t pipes;
-
- data_distributor_t (const data_distributor_t&);
- void operator = (const data_distributor_t&);
- };
-
-}
-
-#endif
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index 8fb0877..b7d153c 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -52,6 +52,10 @@ zmq::devpoll_t::devpoll_t ()
zmq::devpoll_t::~devpoll_t ()
{
+ // Make sure there are no fds registered on shutdown.
+ zmq_assert (load.get () == 0);
+
+ worker.stop ();
close (devpoll_fd);
}
@@ -152,11 +156,6 @@ void zmq::devpoll_t::stop ()
stopping = true;
}
-void zmq::devpoll_t::join ()
-{
- worker.stop ();
-}
-
bool zmq::devpoll_t::loop ()
{
// According to the poll(7d) man page, we can retrieve
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index 28274c0..57f0156 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -42,7 +42,7 @@ namespace zmq
public:
devpoll_t ();
- virtual ~devpoll_t ();
+ ~devpoll_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
@@ -56,7 +56,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
- void join ();
private:
diff --git a/src/dummy_aggregator.cpp b/src/dummy_aggregator.cpp
deleted file mode 100644
index 0b27fab..0000000
--- a/src/dummy_aggregator.cpp
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "dummy_aggregator.hpp"
-#include "err.hpp"
-#include "pipe_reader.hpp"
-#include "session.hpp"
-
-// Swaps pipes at specified indices.
-#define swap_pipes(i1_, i2_) \
- std::swap (pipes [i1_], pipes [i2_]);\
- pipes [i1_]->set_index (i1_);\
- pipes [i2_]->set_index (i2_);
-
-zmq::dummy_aggregator_t::dummy_aggregator_t () :
- session (NULL),
- pipe (NULL),
- active (false)
-{
-}
-
-void zmq::dummy_aggregator_t::set_session (session_t *session_)
-{
- zmq_assert (!session);
- session = session_;
-}
-
-void zmq::dummy_aggregator_t::shutdown ()
-{
- // No need to deallocate the pipe here. It'll be deallocated during the
- // shutdown of the dispatcher.
- delete this;
-}
-
-void zmq::dummy_aggregator_t::terminate ()
-{
- if (pipe)
- pipe->terminate ();
-
- delete this;
-}
-
-zmq::dummy_aggregator_t::~dummy_aggregator_t ()
-{
-}
-
-void zmq::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
-{
- zmq_assert (!pipe);
- pipe = pipe_;
- active = true;
-
- // Associate new pipe with the mux object.
- pipe_->set_mux (this);
- session->revive ();
-}
-
-void zmq::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
-{
- zmq_assert (pipe == pipe_);
- deactivate (pipe_);
- pipe = NULL;
-}
-
-bool zmq::dummy_aggregator_t::empty ()
-{
- return pipe == NULL;
-}
-
-bool zmq::dummy_aggregator_t::recv (zmq_msg *msg_)
-{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
- // Try to read from the pipe.
- if (pipe && pipe->read (msg_))
- return true;
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- return false;
-}
-
-void zmq::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_)
-{
- active = false;
-}
-
-void zmq::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_)
-{
- active = true;
-}
diff --git a/src/dummy_aggregator.hpp b/src/dummy_aggregator.hpp
deleted file mode 100644
index 6a9e9db..0000000
--- a/src/dummy_aggregator.hpp
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__
-#define __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__
-
-#include <vector>
-
-#include "i_mux.hpp"
-
-namespace zmq
-{
-
- // Fake message aggregator. There can be at most one pipe bound to it,
- // so there's no real aggregation going on. However, it is more efficient
- // than a real aggregator. It's intended to be used in the contexts
- // where business logic ensures there'll be at most one pipe bound.
-
- class dummy_aggregator_t : public i_mux
- {
- public:
-
- dummy_aggregator_t ();
-
- // i_mux interface implementation.
- void set_session (session_t *session_);
- void shutdown ();
- void terminate ();
- void attach_pipe (class pipe_reader_t *pipe_);
- void detach_pipe (class pipe_reader_t *pipe_);
- bool empty ();
- void deactivate (class pipe_reader_t *pipe_);
- void reactivate (class pipe_reader_t *pipe_);
- bool recv (struct zmq_msg *msg_);
-
-
- private:
-
- // Clean-up.
- ~dummy_aggregator_t ();
-
- // Reference to the owner session object.
- class session_t *session;
-
- // The single pipe bound.
- class pipe_reader_t *pipe;
-
- // If true, the pipe is active.
- bool active;
-
- dummy_aggregator_t (const dummy_aggregator_t&);
- void operator = (const dummy_aggregator_t&);
- };
-
-}
-
-#endif
diff --git a/src/dummy_distributor.cpp b/src/dummy_distributor.cpp
deleted file mode 100644
index 62e2b88..0000000
--- a/src/dummy_distributor.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "dummy_distributor.hpp"
-#include "pipe_writer.hpp"
-#include "err.hpp"
-#include "session.hpp"
-#include "msg.hpp"
-
-zmq::dummy_distributor_t::dummy_distributor_t () :
- session (NULL)
-{
-}
-
-void zmq::dummy_distributor_t::set_session (session_t *session_)
-{
- zmq_assert (!session);
- session = session_;
-}
-
-void zmq::dummy_distributor_t::shutdown ()
-{
- // No need to deallocate pipe here. It'll be deallocated during the
- // shutdown of the dispatcher.
- delete this;
-}
-
-void zmq::dummy_distributor_t::terminate ()
-{
- if (pipe)
- pipe->terminate ();
-
- delete this;
-}
-
-zmq::dummy_distributor_t::~dummy_distributor_t ()
-{
-}
-
-void zmq::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_)
-{
- zmq_assert (!pipe);
- pipe = pipe_;
-}
-
-void zmq::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_)
-{
- zmq_assert (pipe == pipe_);
- pipe = NULL;
-}
-
-bool zmq::dummy_distributor_t::empty ()
-{
- return pipe == NULL;
-}
-
-bool zmq::dummy_distributor_t::send (zmq_msg *msg_)
-{
- return pipe && pipe->write (msg_);
-}
-
-void zmq::dummy_distributor_t::flush ()
-{
- if (pipe)
- pipe->flush ();
-}
-
diff --git a/src/dummy_distributor.hpp b/src/dummy_distributor.hpp
deleted file mode 100644
index a71cc49..0000000
--- a/src/dummy_distributor.hpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__
-#define __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__
-
-#include <vector>
-
-#include <i_demux.hpp>
-
-namespace zmq
-{
-
- // Fake message distributor. There can be only one pipe bound to it
- // so there no real distribution going on. However, it is more efficient
- // than a real distributor and should be used where business logic
- // ensures there'll be at most one pipe bound.
-
- class dummy_distributor_t : public i_demux
- {
- public:
-
- dummy_distributor_t ();
-
- // i_demux implementation.
- void set_session (class session_t *session_);
- void shutdown ();
- void terminate ();
- void attach_pipe (class pipe_writer_t *pipe_);
- void detach_pipe (class pipe_writer_t *pipe_);
- bool empty ();
- bool send (struct zmq_msg *msg_);
- void flush ();
-
- private:
-
- // Clean-up.
- ~dummy_distributor_t ();
-
- // Reference to the owner session object.
- class session_t *session;
-
- // The bound pipe.
- class pipe_writer_t *pipe;
-
- dummy_distributor_t (const dummy_distributor_t&);
- void operator = (const dummy_distributor_t&);
- };
-
-}
-
-#endif
diff --git a/src/epoll.cpp b/src/epoll.cpp
index c4c8fdb..15278c6 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -41,8 +41,11 @@ zmq::epoll_t::epoll_t () :
zmq::epoll_t::~epoll_t ()
{
- close (epoll_fd);
+ // Make sure there are no fds registered on shutdown.
+ zmq_assert (load.get () == 0);
+ worker.stop ();
+ close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++)
delete *it;
}
@@ -144,11 +147,6 @@ void zmq::epoll_t::stop ()
stopping = true;
}
-void zmq::epoll_t::join ()
-{
- worker.stop ();
-}
-
void zmq::epoll_t::loop ()
{
epoll_event ev_buf [max_io_events];
diff --git a/src/epoll.hpp b/src/epoll.hpp
index aa363ee..619d4f3 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -44,7 +44,7 @@ namespace zmq
public:
epoll_t ();
- virtual ~epoll_t ();
+ ~epoll_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
@@ -58,7 +58,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
- void join ();
private:
diff --git a/src/fair_aggregator.cpp b/src/fair_aggregator.cpp
deleted file mode 100644
index 1e6937f..0000000
--- a/src/fair_aggregator.cpp
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "fair_aggregator.hpp"
-#include "err.hpp"
-#include "pipe_reader.hpp"
-#include "session.hpp"
-
-// Swaps pipes at specified indices.
-#define swap_pipes(i1_, i2_) \
- std::swap (pipes [i1_], pipes [i2_]);\
- pipes [i1_]->set_index (i1_);\
- pipes [i2_]->set_index (i2_);
-
-zmq::fair_aggregator_t::fair_aggregator_t () :
- session (NULL),
- active (0),
- current (0)
-{
-}
-
-void zmq::fair_aggregator_t::set_session (session_t *session_)
-{
- zmq_assert (!session);
- session = session_;
-}
-
-void zmq::fair_aggregator_t::shutdown ()
-{
- // No need to deallocate pipes here. They'll be deallocated during the
- // shutdown of the dispatcher.
- delete this;
-}
-
-void zmq::fair_aggregator_t::terminate ()
-{
- // Pipe unregisters itself during the call to terminate, so the pipes
- // list shinks by one in each iteration.
- while (!pipes.empty ())
- pipes [0]->terminate ();
-
- delete this;
-}
-
-zmq::fair_aggregator_t::~fair_aggregator_t ()
-{
-}
-
-void zmq::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
-{
- // Associate new pipe with the mux object.
- pipe_->set_mux (this);
- pipes.push_back (pipe_);
- active++;
- if (pipes.size () > active)
- swap_pipes (pipes.size () - 1, active - 1);
- if (active == 1)
- session->revive ();
-}
-
-void zmq::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
-{
- // Move the pipe from the list of active pipes to the list of idle pipes.
- deactivate (pipe_);
-
- // Move the pipe to the end of the idle list and remove it.
- swap_pipes (pipe_->get_index (), pipes.size () - 1);
- pipes.pop_back ();
-}
-
-bool zmq::fair_aggregator_t::empty ()
-{
- return pipes.empty ();
-}
-
-bool zmq::fair_aggregator_t::recv (zmq_msg *msg_)
-{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
- // O(1) fair queueing. Round-robin over the active pipes to get
- // next message.
- for (pipes_t::size_type i = active; i != 0; i--) {
-
- // Update current.
- current = (current + 1) % active;
-
- // Try to read from current.
- if (pipes [current]->read (msg_))
- return true;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- return false;
-}
-
-void zmq::fair_aggregator_t::deactivate (pipe_reader_t *pipe_)
-{
- int index = pipe_->get_index ();
-
- // Suspend an active pipe.
- swap_pipes (index, active - 1);
- active--;
-
- // If the deactiveted pipe is the current one, shift the current one pipe
- // backwards so that the pipe that replaced the deactiveted one will be
- // processed immediately rather than skipped.
- if (index == (int) current) {
- index--;
- if (index == -1)
- index = active - 1;
- current = index;
- }
-}
-
-void zmq::fair_aggregator_t::reactivate (pipe_reader_t *pipe_)
-{
- // Revive an idle pipe.
- swap_pipes (pipe_->get_index (), active);
- active++;
- if (active == 1)
- session->revive ();
-}
diff --git a/src/fair_aggregator.hpp b/src/fair_aggregator.hpp
deleted file mode 100644
index 6ae1fc5..0000000
--- a/src/fair_aggregator.hpp
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__
-#define __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__
-
-#include <vector>
-
-#include "i_mux.hpp"
-
-namespace zmq
-{
-
- // Object to aggregate messages from inbound pipes.
-
- class fair_aggregator_t : public i_mux
- {
- public:
-
- fair_aggregator_t ();
-
- // i_mux interface implementation.
- void set_session (session_t *session_);
- void shutdown ();
- void terminate ();
- void attach_pipe (class pipe_reader_t *pipe_);
- void detach_pipe (class pipe_reader_t *pipe_);
- bool empty ();
- void deactivate (class pipe_reader_t *pipe_);
- void reactivate (class pipe_reader_t *pipe_);
- bool recv (struct zmq_msg *msg_);
-
-
- private:
-
- // Clean-up.
- ~fair_aggregator_t ();
-
- // Reference to the owner session object.
- class session_t *session;
-
- // The list of inbound pipes. The active pipes are occupying indices
- // from 0 to active-1. Suspended pipes occupy indices from 'active'
- // to the end of the array.
- typedef std::vector <class pipe_reader_t*> pipes_t;
- pipes_t pipes;
-
- // The number of active pipes.
- pipes_t::size_type active;
-
- // Pipe to retrieve next message from. The messages are retrieved
- // from the pipes in round-robin fashion (a.k.a. fair queueing).
- pipes_t::size_type current;
-
- fair_aggregator_t (const fair_aggregator_t&);
- void operator = (const fair_aggregator_t&);
- };
-
-}
-
-#endif
diff --git a/src/i_api.hpp b/src/i_api.hpp
index fc7275b..a87e41d 100644
--- a/src/i_api.hpp
+++ b/src/i_api.hpp
@@ -1,28 +1,28 @@
/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
+Copyright (c) 2007-2009 FastMQ Inc.
+
+This file is part of 0MQ.
+
+0MQ is free software; you can redistribute it and/or modify it under
+the terms of the Lesser GNU General Public License as published by
+the Free Software Foundation; either version 3 of the License, or
+(at your option) any later version.
+
+0MQ is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+Lesser GNU General Public License for more details.
+
+You should have received a copy of the Lesser GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-
+
#ifndef __ZMQ_I_API_HPP_INCLUDED__
#define __ZMQ_I_API_HPP_INCLUDED__
-
+
namespace zmq
{
-
+
struct i_api
{
virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0;
@@ -33,7 +33,7 @@ namespace zmq
virtual int recv (struct zmq_msg *msg_, int flags_) = 0;
virtual int close () = 0;
};
-
+
}
-
+
#endif
diff --git a/src/i_demux.hpp b/src/i_demux.hpp
deleted file mode 100644
index c4755b5..0000000
--- a/src/i_demux.hpp
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_DEMUX_HPP_INCLUDED__
-#define __ZMQ_I_DEMUX_HPP_INCLUDED__
-
-namespace zmq
-{
-
- struct i_demux
- {
- // Attaches mux to a particular session.
- virtual void set_session (class session_t *session_) = 0;
-
- // To be called when the whole infrastrucure
- // is being closed (zmq_term).
- virtual void shutdown () = 0;
-
- // To be called when session is being closed.
- virtual void terminate () = 0;
-
- // Adds new pipe to the demux to send messages to.
- virtual void attach_pipe (class pipe_writer_t *pipe_) = 0;
-
- // Removes pipe from the demux.
- virtual void detach_pipe (class pipe_writer_t *pipe_) = 0;
-
- // Returns true if there's no pipe attached.
- virtual bool empty () = 0;
-
- // Sends the message. Returns false if the message cannot be sent
- // because the pipes are full.
- virtual bool send (struct zmq_msg *msg_) = 0;
-
- // Flushes messages downstream.
- virtual void flush () = 0;
- };
-
-}
-
-#endif
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
deleted file mode 100644
index 8ca2007..0000000
--- a/src/i_engine.hpp
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
-#define __ZMQ_I_ENGINE_HPP_INCLUDED__
-
-namespace zmq
-{
-
- // Generic interface to access engines from MD objects.
-
- struct i_engine
- {
- // Attach the engine with specified context.
- virtual void attach (struct i_poller *poller_,
- struct i_session *session_) = 0;
-
- // Detach the engine from the current context.
- virtual void detach () = 0;
-
- // Notify the engine that new messages are available.
- virtual void revive () = 0;
-
- // Called by session when it decides the engine
- // should terminate itself.
- virtual void schedule_terminate () = 0;
-
- // Called by normal object termination process.
- virtual void terminate () = 0;
-
- // To be called by MD when terminal shutdown (zmq_term) is in progress.
- virtual void shutdown () = 0;
- };
-
-}
-
-#endif
diff --git a/src/i_mux.hpp b/src/i_mux.hpp
deleted file mode 100644
index 22e0a26..0000000
--- a/src/i_mux.hpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_MUX_HPP_INCLUDED__
-#define __ZMQ_I_MUX_HPP_INCLUDED__
-
-namespace zmq
-{
-
- struct i_mux
- {
- // Attaches mux to a particular session.
- virtual void set_session (class session_t *session_) = 0;
-
- // To be called when the whole infrastrucure
- // is being closed (zmq_term).
- virtual void shutdown () = 0;
-
- // To be called when session is being closed.
- virtual void terminate () = 0;
-
- // Adds new pipe to the mux to send messages to.
- virtual void attach_pipe (class pipe_reader_t *pipe_) = 0;
-
- // Removes pipe from the mux.
- virtual void detach_pipe (class pipe_reader_t *pipe_) = 0;
-
- // Returns true if there's no pipe attached.
- virtual bool empty () = 0;
-
- // Shifts the pipe from active to passive state and vice versa.
- // TODO: Check whether state transitions cannot be done by
- // mux object itself without a need for external APIs.
- virtual void deactivate (class pipe_reader_t *pipe_) = 0;
- virtual void reactivate (class pipe_reader_t *pipe_) = 0;
-
- // Receives a message. Returns false when there is no message
- // to receive.
- virtual bool recv (struct zmq_msg *msg_) = 0;
- };
-
-}
-
-#endif
diff --git a/src/i_poller.hpp b/src/i_poller.hpp
index 52ca095..2665e82 100644
--- a/src/i_poller.hpp
+++ b/src/i_poller.hpp
@@ -75,13 +75,8 @@ namespace zmq
// This method is called from a foreign thread.
virtual void start () = 0;
- // Ask underlying I/O thread to stop. This method is called from
- // underlying thread (callback from io_thread object).
+ // Ask underlying I/O thread to stop.
virtual void stop () = 0;
-
- // Wait for termination of undelying I/O thread.
- // This method is called from a foreign thread.
- virtual void join () = 0;
};
}
diff --git a/src/i_session.hpp b/src/i_session.hpp
deleted file mode 100644
index 21cdc0d..0000000
--- a/src/i_session.hpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_SESSION_HPP_INCLUDED__
-#define __ZMQ_I_SESSION_HPP_INCLUDED__
-
-namespace zmq
-{
-
- struct i_session
- {
- virtual void set_engine (struct i_engine *engine_) = 0;
- virtual void shutdown () = 0;
- virtual bool read (struct zmq_msg *msg_) = 0;
- virtual bool write (struct zmq_msg *msg_) = 0;
- virtual void flush () = 0;
- };
-
-}
-
-#endif
diff --git a/src/req.cpp b/src/i_socket.hpp
index 01018f5..99ade8a 100644
--- a/src/req.cpp
+++ b/src/i_socket.hpp
@@ -17,13 +17,20 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
+#ifndef __ZMQ_I_SOCKET_HPP_INCLUDED__
+#define __ZMQ_I_SOCKET_HPP_INCLUDED__
-#include "req.hpp"
-#include "app_thread.hpp"
-#include "session.hpp"
-
-zmq::req_t::req_t (app_thread_t *thread_, session_t *session_) :
- socket_base_t (thread_, session_)
+namespace zmq
{
+
+ struct i_socket
+ {
+ virtual ~i_socket () {};
+
+ // Start shutting down the socket.
+ virtual void stop () = 0;
+ };
+
}
+
+#endif
diff --git a/src/i_thread.hpp b/src/i_thread.hpp
deleted file mode 100644
index 9f31592..0000000
--- a/src/i_thread.hpp
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_THREAD_HPP_INCLUDED__
-#define __ZMQ_I_THREAD_HPP_INCLUDED__
-
-namespace zmq
-{
-
- // Interface used by session object to communicate with the thread
- // it belongs to.
-
- struct i_thread
- {
- virtual void attach_session (class session_t *session_) = 0;
- virtual void detach_session (class session_t *session_) = 0;
- virtual struct i_poller *get_poller () = 0;
- };
-
-}
-
-#endif
diff --git a/src/io_object.cpp b/src/io_object.cpp
deleted file mode 100644
index ad379cf..0000000
--- a/src/io_object.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "io_object.hpp"
-#include "io_thread.hpp"
-#include "i_poller.hpp"
-
-zmq::io_object_t::io_object_t (io_thread_t *thread_) :
- object_t (thread_),
- thread (thread_)
-{
-}
-
-zmq::io_object_t::~io_object_t ()
-{
-}
-
-zmq::i_poller *zmq::io_object_t::get_poller ()
-{
- return thread->get_poller ();
-}
diff --git a/src/io_object.hpp b/src/io_object.hpp
deleted file mode 100644
index d3fa809..0000000
--- a/src/io_object.hpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__
-#define __ZMQ_IO_OBJECT_HPP_INCLUDED__
-
-#include "object.hpp"
-
-namespace zmq
-{
-
- // All objects running within the context of an I/O thread should be
- // derived from this class to allow owning application threads to
- // destroy them.
-
- class io_object_t : public object_t
- {
- public:
-
- io_object_t (class io_thread_t *thread_);
- ~io_object_t ();
-
- virtual void terminate () = 0;
- virtual void shutdown () = 0;
-
- struct i_poller *get_poller ();
-
- private:
-
- class io_thread_t *thread;
- };
-
-}
-
-#endif
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 162ed4c..f5261a6 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -30,9 +30,7 @@
#include "devpoll.hpp"
#include "kqueue.hpp"
#include "context.hpp"
-#include "session.hpp"
#include "simple_semaphore.hpp"
-#include "session.hpp"
zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) :
object_t (context_, thread_slot_)
@@ -76,15 +74,6 @@ zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) :
poller->set_pollin (signaler_handle);
}
-void zmq::io_thread_t::shutdown ()
-{
- // Deallocate all the sessions associated with the thread.
- while (!sessions.empty ())
- sessions [0]->shutdown ();
-
- delete this;
-}
-
zmq::io_thread_t::~io_thread_t ()
{
delete poller;
@@ -101,11 +90,6 @@ void zmq::io_thread_t::stop ()
send_stop ();
}
-void zmq::io_thread_t::join ()
-{
- poller->join ();
-}
-
zmq::i_signaler *zmq::io_thread_t::get_signaler ()
{
return &signaler;
@@ -149,21 +133,6 @@ void zmq::io_thread_t::timer_event ()
zmq_assert (false);
}
-void zmq::io_thread_t::attach_session (session_t *session_)
-{
- session_->set_index (sessions.size ());
- sessions.push_back (session_);
-}
-
-void zmq::io_thread_t::detach_session (session_t *session_)
-{
- // O(1) removal of the session from the list.
- sessions_t::size_type i = session_->get_index ();
- sessions [i] = sessions [sessions.size () - 1];
- sessions [i]->set_index (i);
- sessions.pop_back ();
-}
-
zmq::i_poller *zmq::io_thread_t::get_poller ()
{
zmq_assert (poller);
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index 585a28b..43ee19e 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -23,7 +23,6 @@
#include <vector>
#include "object.hpp"
-#include "i_thread.hpp"
#include "i_poller.hpp"
#include "i_poll_events.hpp"
#include "fd_signaler.hpp"
@@ -34,26 +33,22 @@ namespace zmq
// Generic part of the I/O thread. Polling-mechanism-specific features
// are implemented in separate "polling objects".
- class io_thread_t : public object_t, public i_poll_events, public i_thread
+ class io_thread_t : public object_t, public i_poll_events
{
public:
io_thread_t (class context_t *context_, int thread_slot_);
+ // Clean-up. If the thread was started, it's neccessary to call 'stop'
+ // before invoking destructor. Otherwise the destructor would hang up.
+ ~io_thread_t ();
+
// Launch the physical thread.
void start ();
// Ask underlying thread to stop.
void stop ();
- // Wait till undelying thread terminates.
- void join ();
-
- // To be called when the whole infrastrucure is being closed (zmq_term).
- // It's vital to call the individual commands in this sequence:
- // stop, join, shutdown.
- void shutdown ();
-
// Returns signaler associated with this I/O thread.
i_signaler *get_signaler ();
@@ -62,9 +57,7 @@ namespace zmq
void out_event ();
void timer_event ();
- // i_thread implementation.
- void attach_session (class session_t *session_);
- void detach_session (class session_t *session_);
+ // ???
struct i_poller *get_poller ();
// Command handlers.
@@ -75,9 +68,6 @@ namespace zmq
private:
- // Clean-up.
- ~io_thread_t ();
-
// Poll thread gets notifications about incoming commands using
// this signaler.
fd_signaler_t signaler;
@@ -87,11 +77,6 @@ namespace zmq
// I/O multiplexing is performed using a poller object.
i_poller *poller;
-
- // Vector of all sessions associated with this app thread.
- typedef std::vector <class session_t*> sessions_t;
- sessions_t sessions;
-
};
}
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index 28c15de..f4c58a3 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -42,6 +42,10 @@ zmq::kqueue_t::kqueue_t ()
zmq::kqueue_t::~kqueue_t ()
{
+ // Make sure there are no fds registered on shutdown.
+ zmq_assert (load.get () == 0);
+
+ worker.stop ();
close (kqueue_fd);
}
@@ -144,11 +148,6 @@ void zmq::kqueue_t::stop ()
stopping = true;
}
-void zmq::kqueue_t::join ()
-{
- worker.stop ();
-}
-
void zmq::kqueue_t::loop ()
{
while (!stopping) {
diff --git a/src/kqueue.hpp b/src/kqueue.hpp
index 2fd6819..eeb6f09 100644
--- a/src/kqueue.hpp
+++ b/src/kqueue.hpp
@@ -42,7 +42,7 @@ namespace zmq
public:
kqueue_t ();
- virtual ~kqueue_t ();
+ ~kqueue_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
@@ -56,7 +56,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
- void join ();
private:
diff --git a/src/listener.cpp b/src/listener.cpp
deleted file mode 100644
index 823b21b..0000000
--- a/src/listener.cpp
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "listener.hpp"
-#include "simple_semaphore.hpp"
-#include "zmq_tcp_engine.hpp"
-#include "io_thread.hpp"
-#include "session_stub.hpp"
-#include "session.hpp"
-#include "err.hpp"
-#include "dummy_aggregator.hpp"
-#include "dummy_distributor.hpp"
-
-zmq::listener_t::listener_t (io_thread_t *thread_, const char *addr_,
- session_t *peer_, bool has_in_, bool has_out_, uint64_t taskset_) :
- io_object_t (thread_),
- poller (NULL),
- addr (addr_),
- peer (peer_),
- taskset (taskset_),
- has_in (has_in_),
- has_out (has_out_)
-{
-}
-
-void zmq::listener_t::terminate ()
-{
- for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++)
- session_stubs [i]->terminate ();
- delete this;
-}
-
-void zmq::listener_t::shutdown ()
-{
- for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++)
- session_stubs [i]->shutdown ();
- delete this;
-}
-
-zmq::listener_t::~listener_t ()
-{
-}
-
-void zmq::listener_t::got_identity (session_stub_t *session_stub_,
- const char *identity_)
-{
- // Get the engine allready disconnected from the stub and poller.
- i_engine *engine = session_stub_->detach_engine ();
- zmq_assert (engine);
-
- // Find the corresponding session.
- session_t *session;
- sessions_t::iterator it = sessions.find (identity_);
-
- // Destroy the stub.
- int i = session_stub_->get_index ();
- session_stubs [i] = session_stubs [session_stubs.size () - 1];
- session_stubs [i]->set_index (i);
- session_stubs.pop_back ();
- session_stub_->terminate ();
-
- // If there's no session with the specified identity, create one.
- if (it != sessions.end ()) {
- session = it->second;
- session->inc_seqnum ();
- }
- else {
-
- // Choose an I/O thread with the least load to handle the new session.
- io_thread_t *io_thread = choose_io_thread (taskset);
-
- // Create the session and bind it to the I/O thread and peer. Make
- // sure that the peer session won't get deallocated till it processes
- // the subsequent bind command.
- i_mux *mux = new dummy_aggregator_t;
- zmq_assert (mux);
- i_demux *demux = new dummy_distributor_t;
- zmq_assert (demux);
- session = new session_t (io_thread, io_thread, mux, demux, false, true);
- zmq_assert (session);
- session->inc_seqnum ();
- session->inc_seqnum ();
- peer->inc_seqnum ();
- send_reg_and_bind (session, peer, has_in, has_out);
- }
-
- // Attach the engine to the session.
- send_engine (session, engine);
-}
-
-void zmq::listener_t::process_reg (simple_semaphore_t *smph_)
-{
- zmq_assert (!poller);
- poller = get_poller ();
-
- // Open the listening socket.
- int rc = tcp_listener.open (addr.c_str ());
- zmq_assert (rc == 0);
-
- // Unlock the application thread that created the listener.
- if (smph_)
- smph_->post ();
-
- // Start polling for incoming connections.
- handle = poller->add_fd (tcp_listener.get_fd (), this);
- poller->set_pollin (handle);
-}
-
-void zmq::listener_t::process_unreg (simple_semaphore_t *smph_)
-{
- // Disassociate listener from the poller.
- zmq_assert (poller);
- poller->rm_fd (handle);
- poller = NULL;
-
- // Unlock the application thread closing the listener.
- if (smph_)
- smph_->post ();
-}
-
-void zmq::listener_t::in_event ()
-{
- fd_t fd = tcp_listener.accept ();
-
- // If connection was reset by the peer in the meantime, just ignore it.
- // TODO: Handle specific errors like ENFILE/EMFILE etc.
- if (fd == retired_fd)
- return;
-
- // Create an session stub for the engine to take care for it till its
- // identity is retreived.
- session_stub_t *session_stub = new session_stub_t (this);
- zmq_assert (session_stub);
- session_stub->set_index (session_stubs.size ());
- session_stubs.push_back (session_stub);
-
- // Create an engine to encaspulate the socket. Engine will register itself
- // with the stub so the stub will be able to free it in case of shutdown.
- zmq_tcp_engine_t *engine = new zmq_tcp_engine_t (fd);
- zmq_assert (engine);
- engine->attach (poller, session_stub);
-}
-
-void zmq::listener_t::out_event ()
-{
- zmq_assert (false);
-}
-
-void zmq::listener_t::timer_event ()
-{
- zmq_assert (false);
-}
-
-
diff --git a/src/listener.hpp b/src/listener.hpp
deleted file mode 100644
index 2fe93db..0000000
--- a/src/listener.hpp
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_LISTENER_HPP_INCLUDED__
-#define __ZMQ_LISTENER_HPP_INCLUDED__
-
-#include <map>
-#include <vector>
-#include <string>
-
-#include "io_object.hpp"
-#include "tcp_listener.hpp"
-#include "i_poller.hpp"
-#include "i_poll_events.hpp"
-#include "stdint.hpp"
-
-namespace zmq
-{
-
- class listener_t : public io_object_t, public i_poll_events
- {
- public:
-
- listener_t (class io_thread_t *thread_, const char *addr_,
- class session_t *peer_, bool has_in_, bool has_out_,
- uint64_t taskset_);
-
- void terminate ();
- void shutdown ();
-
- // This function is called by session stub once the identity
- // is retrieved from the incoming connection.
- void got_identity (class session_stub_t *session_stub_,
- const char *identity_);
-
- void process_reg (class simple_semaphore_t *smph_);
- void process_unreg (class simple_semaphore_t *smph_);
-
- // i_poll_events implementation.
- void in_event ();
- void out_event ();
- void timer_event ();
-
- private:
-
- ~listener_t ();
-
- struct i_poller *poller;
-
- // Handle corresponding to the listening socket.
- handle_t handle;
-
- // Actual listening socket.
- tcp_listener_t tcp_listener;
-
- // Address to bind to.
- std::string addr;
-
- // Peer session. All the newly created connections should bind to
- // this session.
- session_t *peer;
-
- // Taskset specifies which I/O threads are to be use to handle
- // newly created connections (0 = all).
- uint64_t taskset;
-
- // Sessions created by this listener are stored in this map. They are
- // indexed by peer identities so that the same peer connects to the
- // same session after reconnection.
- // NB: Sessions are destroyed from other place and possibly later on,
- // so no need to care about them during listener object termination.
- typedef std::map <std::string, class session_t*> sessions_t;
- sessions_t sessions;
-
- // List of engines (bound to temorary session stubs) that we haven't
- // retrieved the identity from so far.
- typedef std::vector <class session_stub_t*> session_stubs_t;
- session_stubs_t session_stubs;
-
- // If true, create inbound pipe when binding new connection
- // to the peer.
- bool has_in;
-
- // If true, create outbound pipe when binding new connection
- // to the peer.
- bool has_out;
-
- listener_t (const listener_t&);
- void operator = (const listener_t&);
- };
-
-}
-
-#endif
diff --git a/src/load_balancer.cpp b/src/load_balancer.cpp
deleted file mode 100644
index 0d382a1..0000000
--- a/src/load_balancer.cpp
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "load_balancer.hpp"
-#include "pipe_writer.hpp"
-#include "err.hpp"
-#include "session.hpp"
-#include "msg.hpp"
-
-zmq::load_balancer_t::load_balancer_t () :
- session (NULL),
- current (0)
-{
-}
-
-void zmq::load_balancer_t::set_session (session_t *session_)
-{
- zmq_assert (!session);
- session = session_;
-}
-
-void zmq::load_balancer_t::shutdown ()
-{
- // No need to deallocate pipes here. They'll be deallocated during the
- // shutdown of the dispatcher.
- delete this;
-}
-
-void zmq::load_balancer_t::terminate ()
-{
- // Pipe unregisters itself during the call to terminate, so the pipes
- // list shinks by one in each iteration.
- while (!pipes.empty ())
- pipes [0]->terminate ();
-
- delete this;
-}
-
-zmq::load_balancer_t::~load_balancer_t ()
-{
-}
-
-void zmq::load_balancer_t::attach_pipe (pipe_writer_t *pipe_)
-{
- // Associate demux with a new pipe.
- pipe_->set_demux (this);
- pipe_->set_index (pipes.size ());
- pipes.push_back (pipe_);
-}
-
-void zmq::load_balancer_t::detach_pipe (pipe_writer_t *pipe_)
-{
- // Release the reference to the pipe.
- int index = pipe_->get_index ();
- pipe_->set_index (-1);
- pipes [index] = pipes.back ();
- pipes [index]->set_index (index);
- pipes.pop_back ();
-}
-
-bool zmq::load_balancer_t::empty ()
-{
- return pipes.empty ();
-}
-
-bool zmq::load_balancer_t::send (zmq_msg *msg_)
-{
- // If there are no pipes, message cannot be sent.
- if (pipes.size () == 0)
- return false;
-
- // Find the first pipe that is ready to accept the message.
- bool found = false;
- for (pipes_t::size_type i = 0; !found && i < pipes.size (); i++) {
-// if (pipes [current]->check_write (msg))
- found = true;
-// else
-// current = (current + 1) % pipes.size ();
- }
-
- // Oops, no pipe is ready to accept the message.
- if (!found)
- return false;
-
- // Send the message to the selected pipe.
- write_to_pipe (pipes [current], msg_);
- current = (current + 1) % pipes.size ();
-
- // Detach the original message from the data buffer.
- zmq_msg_init (msg_);
-
- return true;
-}
-
-void zmq::load_balancer_t::flush ()
-{
- // Flush all pipes. If there's large number of pipes, it can be pretty
- // inefficient (especially if there's new message only in a single pipe).
- // Can it be improved?
- for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
- (*it)->flush ();
-}
-
-void zmq::load_balancer_t::write_to_pipe (class pipe_writer_t *pipe_,
- struct zmq_msg *msg_)
-{
- if (!pipe_->write (msg_)) {
- // TODO: Push gap notification to the pipe.
- zmq_assert (false);
- }
-}
-
diff --git a/src/load_balancer.hpp b/src/load_balancer.hpp
deleted file mode 100644
index 953ed3b..0000000
--- a/src/load_balancer.hpp
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_LOAD_BALANCER_HPP_INCLUDED__
-#define __ZMQ_LOAD_BALANCER_HPP_INCLUDED__
-
-#include <vector>
-
-#include <i_demux.hpp>
-
-namespace zmq
-{
-
- // Object to distribute messages to outbound pipes.
-
- class load_balancer_t : public i_demux
- {
- public:
-
- load_balancer_t ();
-
- // i_demux implementation.
- void set_session (class session_t *session_);
- void shutdown ();
- void terminate ();
- void attach_pipe (class pipe_writer_t *pipe_);
- void detach_pipe (class pipe_writer_t *pipe_);
- bool empty ();
- bool send (struct zmq_msg *msg_);
- void flush ();
-
- private:
-
- // Clean-up.
- ~load_balancer_t ();
-
- // Reference to the owner session object.
- class session_t *session;
-
- // Writes the message to the pipe if possible. If it isn't, writes
- // a gap notification to the pipe.
- void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_);
-
- // The list of outbound pipes.
- typedef std::vector <class pipe_writer_t*> pipes_t;
- pipes_t pipes;
-
- // Current pipe to write next message to.
- pipes_t::size_type current;
-
- load_balancer_t (const load_balancer_t&);
- void operator = (const load_balancer_t&);
- };
-
-}
-
-#endif
diff --git a/src/object.cpp b/src/object.cpp
index 7c85212..36f3937 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -20,12 +20,8 @@
#include "object.hpp"
#include "context.hpp"
#include "err.hpp"
-#include "pipe_reader.hpp"
-#include "pipe_writer.hpp"
-#include "session.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
-#include "i_engine.hpp"
zmq::object_t::object_t (context_t *context_, int thread_slot_) :
context (context_),
@@ -103,35 +99,6 @@ void zmq::object_t::process_command (command_t &cmd_)
}
}
-void zmq::object_t::create_pipe (object_t *reader_parent_,
- object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
- pipe_reader_t **reader_, pipe_writer_t **writer_)
-{
- context->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_,
- reader_, writer_);
-}
-
-void zmq::object_t::destroy_pipe (pipe_t *pipe_)
-{
- context->destroy_pipe (pipe_);
-}
-
-int zmq::object_t::register_inproc_endpoint (const char *endpoint_,
- session_t *session_)
-{
- return context->register_inproc_endpoint (endpoint_, session_);
-}
-
-zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_)
-{
- return context->get_inproc_endpoint (endpoint_);
-}
-
-void zmq::object_t::unregister_inproc_endpoints (session_t *session_)
-{
- context->unregister_inproc_endpoints (session_);
-}
-
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return context->choose_io_thread (taskset_);
diff --git a/src/object.hpp b/src/object.hpp
index 796e7fa..5851c68 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -44,14 +44,6 @@ namespace zmq
// Derived object can use following functions to interact with
// global repositories. See context.hpp for function details.
int thread_slot_count ();
- void create_pipe (class object_t *reader_parent_,
- class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
- class pipe_reader_t **reader_, class pipe_writer_t **writer_);
- void destroy_pipe (class pipe_t *pipe_);
- int register_inproc_endpoint (const char *endpoint_,
- class session_t *session_);
- class object_t *get_inproc_endpoint (const char *endpoint_);
- void unregister_inproc_endpoints (class session_t *session_);
class io_thread_t *choose_io_thread (uint64_t taskset_);
// Derived object can use these functions to send commands
diff --git a/src/p2p.cpp b/src/p2p.cpp
deleted file mode 100644
index c83d8b1..0000000
--- a/src/p2p.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "p2p.hpp"
-#include "app_thread.hpp"
-#include "session.hpp"
-
-zmq::p2p_t::p2p_t (app_thread_t *thread_, session_t *session_) :
- socket_base_t (thread_, session_)
-{
-}
diff --git a/src/p2p.hpp b/src/p2p.hpp
deleted file mode 100644
index d3d9dc3..0000000
--- a/src/p2p.hpp
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_P2P_HPP_INCLUDED__
-#define __ZMQ_P2P_HPP_INCLUDED__
-
-#include "socket_base.hpp"
-
-namespace zmq
-{
-
- class p2p_t : public socket_base_t
- {
- public:
-
- p2p_t (class app_thread_t *thread_, class session_t *session_);
-
- private:
-
- p2p_t (const p2p_t&);
- void operator = (const p2p_t&);
- };
-
-}
-
-#endif
diff --git a/src/pipe.cpp b/src/pipe.cpp
deleted file mode 100644
index bf761b4..0000000
--- a/src/pipe.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "pipe.hpp"
-
-zmq::pipe_t::pipe_t () :
- ypipe_t <zmq_msg, false, message_pipe_granularity> (false),
- index (-1)
-{
-}
-
-zmq::pipe_t::~pipe_t ()
-{
- // Flush any outstanding messages to the pipe.
- flush ();
-
- // Deallocate all the messages in the pipe.
- zmq_msg msg;
- while (read (&msg))
- zmq_msg_close (&msg);
-}
-
-void zmq::pipe_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::pipe_t::get_index ()
-{
- return index;
-}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 8894a22..d771120 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -28,29 +28,10 @@
namespace zmq
{
- // Message pipe. A simple wrapper on top of ypipe.
-
+ // Message pipe.
class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity>
{
- // Context is a friend so that it can create & destroy the pipes.
- // By making constructor & destructor private we are sure that nobody
- // except context messes with pipes.
- friend class context_t;
-
- private:
-
- pipe_t ();
- ~pipe_t ();
-
- void set_index (int index_);
- int get_index ();
-
- // Index of the pipe in context's array of pipes.
- int index;
-
- pipe_t (const pipe_t&);
- void operator = (const pipe_t&);
- };
+ };
}
diff --git a/src/pipe_reader.cpp b/src/pipe_reader.cpp
deleted file mode 100644
index 79dfe2e..0000000
--- a/src/pipe_reader.cpp
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "pipe_reader.hpp"
-#include "pipe.hpp"
-#include "err.hpp"
-#include "i_mux.hpp"
-
-zmq::pipe_reader_t::pipe_reader_t (object_t *parent_, pipe_t *pipe_,
- uint64_t hwm_, uint64_t lwm_) :
- object_t (parent_),
- pipe (pipe_),
- peer (NULL),
- mux (NULL),
- index (-1),
- hwm (hwm_),
- lwm (lwm_),
- head (0),
- tail (0),
- last_sent_head (0)
-{
-}
-
-void zmq::pipe_reader_t::set_peer (object_t *peer_)
-{
- peer = peer_;
-}
-
-zmq::pipe_reader_t::~pipe_reader_t ()
-{
-}
-
-void zmq::pipe_reader_t::set_mux (i_mux *mux_)
-{
- mux = mux_;
-}
-
-void zmq::pipe_reader_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::pipe_reader_t::get_index ()
-{
- return index;
-}
-
-void zmq::pipe_reader_t::process_tail (uint64_t bytes_)
-{
- tail = bytes_;
- mux->reactivate (this);
-}
-
-bool zmq::pipe_reader_t::read (struct zmq_msg *msg_)
-{
- // Read a message.
- if (!pipe->read (msg_)) {
- mux->deactivate (this);
- return false;
- }
-
- // If successfull, adjust the head of the pipe.
- head += zmq_msg_size (msg_);
-
- // If pipe writer wasn't notified about the head position for long enough,
- // notify it.
- if (head - last_sent_head >= hwm - lwm) {
- send_head (peer, head);
- last_sent_head = head;
- }
-
- if (zmq_msg_type (msg_) == ZMQ_DELIMITER) {
-
- // Detach the pipe from the mux and send termination request to
- // the pipe writer.
- mux->detach_pipe (this);
- mux = NULL;
- send_terminate (peer);
- return false;
- }
-
- return true;
-}
-
-void zmq::pipe_reader_t::terminate ()
-{
- // Detach the pipe from the mux and send termination request to
- // the pipe writer.
- if (mux) {
- mux->detach_pipe (this);
- mux = NULL;
- }
- send_terminate (peer);
-}
-
-void zmq::pipe_reader_t::process_terminate_ack ()
-{
- // Ask context to deallocate the pipe.
- destroy_pipe (pipe);
-}
diff --git a/src/pipe_reader.hpp b/src/pipe_reader.hpp
deleted file mode 100644
index cf45bb4..0000000
--- a/src/pipe_reader.hpp
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_PIPE_READER_HPP_INCLUDED__
-#define __ZMQ_PIPE_READER_HPP_INCLUDED__
-
-#include "object.hpp"
-#include "stdint.hpp"
-
-namespace zmq
-{
-
- class pipe_reader_t : public object_t
- {
- // Context is a friend so that it can create & destroy the reader.
- // By making constructor & destructor private we are sure that nobody
- // except context messes with readers.
- friend class context_t;
-
- public:
-
- // Set & get index in the associated mux object.
- void set_mux (struct i_mux *mux_);
- void set_index (int index_);
- int get_index ();
-
- // Reads a message to the underlying pipe.
- bool read (struct zmq_msg *msg_);
-
- // Asks pipe to destroy itself.
- void terminate ();
-
- private:
-
- pipe_reader_t (class object_t *parent_, class pipe_t *pipe_,
- uint64_t hwm_, uint64_t lwm_);
- ~pipe_reader_t ();
-
- // Second step of reader construction. The parameter cannot be passed
- // in constructor as peer object doesn't yet exist at the time.
- void set_peer (class object_t *peer_);
-
- void process_tail (uint64_t bytes_);
- void process_terminate_ack ();
-
- // The underlying pipe.
- class pipe_t *pipe;
-
- // Pipe writer associated with the other side of the pipe.
- class object_t *peer;
-
- // Associated mux object.
- struct i_mux *mux;
-
- // Index in the associated mux object.
- int index;
-
- // High and low watermarks for in-memory storage (in bytes).
- uint64_t hwm;
- uint64_t lwm;
-
- // Positions of head and tail of the pipe (in bytes).
- uint64_t head;
- uint64_t tail;
- uint64_t last_sent_head;
-
- pipe_reader_t (const pipe_reader_t&);
- void operator = (const pipe_reader_t&);
- };
-
-}
-
-#endif
diff --git a/src/pipe_writer.cpp b/src/pipe_writer.cpp
deleted file mode 100644
index a54034b..0000000
--- a/src/pipe_writer.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "pipe_writer.hpp"
-#include "pipe.hpp"
-#include "i_demux.hpp"
-
-zmq::pipe_writer_t::pipe_writer_t (object_t *parent_, pipe_t *pipe_,
- object_t *peer_, uint64_t hwm_, uint64_t lwm_) :
- object_t (parent_),
- pipe (pipe_),
- peer (peer_),
- demux (NULL),
- index (-1),
- hwm (hwm_),
- lwm (lwm_),
- head (0),
- tail (0)
-{
-}
-
-zmq::pipe_writer_t::~pipe_writer_t ()
-{
-}
-
-void zmq::pipe_writer_t::set_demux (i_demux *demux_)
-{
- demux = demux_;
-}
-
-void zmq::pipe_writer_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::pipe_writer_t::get_index ()
-{
- return index;
-}
-
-bool zmq::pipe_writer_t::write (zmq_msg *msg_)
-{
- size_t msg_size = zmq_msg_size (msg_);
-
- // If message won't fit into the in-memory pipe, there's no way
- // to pass it further.
- // TODO: It should be discarded and 'oversized' notification should be
- // placed into the pipe.
- zmq_assert (!hwm || msg_size <= hwm);
-
- // If there's not enough space in the pipe at the moment, return false.
- if (hwm && tail + msg_size - head > hwm)
- return false;
-
- // Write the message to the pipe and adjust tail position.
- pipe->write (*msg_);
- flush ();
- tail += msg_size;
-
- return true;
-}
-
-void zmq::pipe_writer_t::flush ()
-{
- if (!pipe->flush ())
- send_tail (peer, tail);
-}
-
-void zmq::pipe_writer_t::process_head (uint64_t bytes_)
-{
- head = bytes_;
-}
-
-void zmq::pipe_writer_t::terminate ()
-{
- // Disconnect from the associated demux.
- if (demux) {
- demux->detach_pipe (this);
- demux = NULL;
- }
-
- // Push the delimiter to the pipe. Delimiter is a notification for pipe
- // reader that there will be no more messages in the pipe.
- zmq_msg delimiter;
- delimiter.content = (zmq_msg_content*) ZMQ_DELIMITER;
- delimiter.shared = false;
- delimiter.vsm_size = 0;
- pipe->write (delimiter);
- flush ();
-}
-
-void zmq::pipe_writer_t::process_terminate ()
-{
- // Disconnect from the associated demux.
- if (demux) {
- demux->detach_pipe (this);
- demux = NULL;
- }
-
- // Send termination acknowledgement to the pipe reader.
- send_terminate_ack (peer);
-}
diff --git a/src/pipe_writer.hpp b/src/pipe_writer.hpp
deleted file mode 100644
index a727b1f..0000000
--- a/src/pipe_writer.hpp
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_PIPE_WRITER_HPP_INCLUDED__
-#define __ZMQ_PIPE_WRITER_HPP_INCLUDED__
-
-#include "object.hpp"
-#include "stdint.hpp"
-
-namespace zmq
-{
-
- class pipe_writer_t : public object_t
- {
- // Context is a friend so that it can create & destroy the writer.
- // By making constructor & destructor private we are sure that nobody
- // except context messes with writers.
- friend class context_t;
-
- public:
-
- // Set & get index in the associated demux object.
- void set_demux (struct i_demux *demux_);
- void set_index (int index_);
- int get_index ();
-
- // Writes a message to the underlying pipe. Returns false if the
- // message cannot be written to the pipe at the moment.
- bool write (struct zmq_msg *msg_);
-
- // Flush the messages downsteam.
- void flush ();
-
- // Asks pipe to destroy itself.
- void terminate ();
-
- private:
-
- pipe_writer_t (class object_t *parent_, class pipe_t *pipe_,
- class object_t *peer_, uint64_t hwm_, uint64_t lwm_);
- ~pipe_writer_t ();
-
- void process_head (uint64_t bytes_);
- void process_terminate ();
-
- // The underlying pipe.
- class pipe_t *pipe;
-
- // Pipe reader associated with the other side of the pipe.
- class object_t *peer;
-
- // Associated demux object.
- struct i_demux *demux;
-
- // Index in the associated demux object.
- int index;
-
- // High and low watermarks for in-memory storage (in bytes).
- uint64_t hwm;
- uint64_t lwm;
-
- // Positions of head and tail of the pipe (in bytes).
- uint64_t head;
- uint64_t tail;
-
- pipe_writer_t (const pipe_writer_t&);
- void operator = (const pipe_writer_t&);
- };
-
-}
-
-#endif
diff --git a/src/poll.cpp b/src/poll.cpp
index 864cfad..94e4fd4 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -50,6 +50,14 @@ zmq::poll_t::poll_t () :
fd_table [i].index = retired_fd;
}
+zmq::poll_t::~poll_t ()
+{
+ // Make sure there are no fds registered on shutdown.
+ zmq_assert (load.get () == 0);
+
+ worker.stop ();
+}
+
zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
{
pollfd pfd = {fd_, 0, 0};
@@ -132,11 +140,6 @@ void zmq::poll_t::stop ()
stopping = true;
}
-void zmq::poll_t::join ()
-{
- worker.stop ();
-}
-
void zmq::poll_t::loop ()
{
while (!stopping) {
diff --git a/src/poll.hpp b/src/poll.hpp
index dbfa776..9fe6067 100644
--- a/src/poll.hpp
+++ b/src/poll.hpp
@@ -47,7 +47,7 @@ namespace zmq
public:
poll_t ();
- virtual ~poll_t () {}
+ ~poll_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
@@ -61,7 +61,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
- void join ();
private:
diff --git a/src/pub.cpp b/src/pub.cpp
deleted file mode 100644
index 5dca0b8..0000000
--- a/src/pub.cpp
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "pub.hpp"
-#include "app_thread.hpp"
-#include "session.hpp"
-#include "err.hpp"
-
-zmq::pub_t::pub_t (app_thread_t *thread_, session_t *session_) :
- socket_base_t (thread_, session_)
-{
- disable_in ();
-}
-
-int zmq::pub_t::recv (struct zmq_msg *msg_, int flags_)
-{
- // Publisher socket has no recv function.
- errno = ENOTSUP;
- return -1;
-}
diff --git a/src/pub.hpp b/src/pub.hpp
deleted file mode 100644
index 909e731..0000000
--- a/src/pub.hpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_PUB_HPP_INCLUDED__
-#define __ZMQ_PUB_HPP_INCLUDED__
-
-#include "socket_base.hpp"
-
-namespace zmq
-{
-
- class pub_t : public socket_base_t
- {
- public:
-
- pub_t (class app_thread_t *thread_, class session_t *session_);
-
- // i_api overloads.
- int recv (struct zmq_msg *msg_, int flags_);
-
- private:
-
- pub_t (const pub_t&);
- void operator = (const pub_t&);
- };
-
-}
-
-#endif
diff --git a/src/rep.cpp b/src/rep.cpp
deleted file mode 100644
index 60767e1..0000000
--- a/src/rep.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "rep.hpp"
-#include "app_thread.hpp"
-#include "session.hpp"
-
-zmq::rep_t::rep_t (app_thread_t *thread_, session_t *session_) :
- socket_base_t (thread_, session_)
-{
-}
diff --git a/src/rep.hpp b/src/rep.hpp
deleted file mode 100644
index 92d2758..0000000
--- a/src/rep.hpp
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_REP_HPP_INCLUDED__
-#define __ZMQ_REP_HPP_INCLUDED__
-
-#include "socket_base.hpp"
-
-namespace zmq
-{
-
- class rep_t : public socket_base_t
- {
- public:
-
- rep_t (class app_thread_t *thread_, class session_t *session_);
-
- private:
-
- rep_t (const rep_t&);
- void operator = (const rep_t&);
- };
-
-}
-
-#endif
diff --git a/src/req.hpp b/src/req.hpp
deleted file mode 100644
index c279f0e..0000000
--- a/src/req.hpp
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_REQ_HPP_INCLUDED__
-#define __ZMQ_REQ_HPP_INCLUDED__
-
-#include "socket_base.hpp"
-
-namespace zmq
-{
-
- class req_t : public socket_base_t
- {
- public:
-
- req_t (class app_thread_t *thread_, class session_t *session_);
-
- private:
-
- req_t (const req_t&);
- void operator = (const req_t&);
- };
-
-}
-
-#endif
diff --git a/src/safe_object.cpp b/src/safe_object.cpp
deleted file mode 100644
index d4a92d7..0000000
--- a/src/safe_object.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "safe_object.hpp"
-
-zmq::safe_object_t::safe_object_t (class context_t *context_,
- int thread_slot_) :
- object_t (context_, thread_slot_),
- processed_seqnum (0),
- terminating (false)
-{
-}
-
-zmq::safe_object_t::safe_object_t (object_t *parent_) :
- object_t (parent_),
- processed_seqnum (0),
- terminating (false)
-{
-}
-
-void zmq::safe_object_t::inc_seqnum ()
-{
- // This function is called from the sender thread to ensure that this
- // object will still exist when the command sent to it arrives in the
- // destination thread.
- sent_seqnum.add (1);
-}
-
-void zmq::safe_object_t::process_command (struct command_t &cmd_)
-{
- object_t::process_command (cmd_);
-
- // Adjust sequence number of the last processed command.
- processed_seqnum++;
-
- // If we are already in the termination phase and all commands sent to
- // this object are processed, it's safe to deallocate it.
- if (terminating && sent_seqnum.get () == processed_seqnum)
- delete this;
-}
-
-void zmq::safe_object_t::terminate ()
-{
- // Wait till all commands sent to this session are processed.
- terminating = true;
-
- // If there's no pending command we can deallocate the session
- // straight saway.
- if (sent_seqnum.get () == processed_seqnum)
- delete this;
-}
-
-bool zmq::safe_object_t::is_terminating ()
-{
- return terminating;
-}
-
-zmq::safe_object_t::~safe_object_t ()
-{
-}
diff --git a/src/safe_object.hpp b/src/safe_object.hpp
deleted file mode 100644
index b47db48..0000000
--- a/src/safe_object.hpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_SAFE_OBJECT_HPP_INCLUDED__
-#define __ZMQ_SAFE_OBJECT_HPP_INCLUDED__
-
-#include "object.hpp"
-#include "atomic_counter.hpp"
-
-namespace zmq
-{
-
- // Same as object_t with the exception of termination mechanism. While
- // object_t is destroyed immediately on terminate (assuming that the caller
- // have ensured that there are no more commands for the object on the
- // fly), safe_object_t switches into termination mode and waits for all
- // the on-the-fly commands to be delivered before it deallocates itself.
-
- class safe_object_t : public object_t
- {
- public:
-
- safe_object_t (class context_t *context_, int thread_slot_);
- safe_object_t (object_t *parent_);
-
- void inc_seqnum ();
- void process_command (struct command_t &cmd_);
-
- protected:
-
- void terminate ();
- bool is_terminating ();
-
- virtual ~safe_object_t ();
-
- private:
-
- // Sequence number of the last command sent to the object and last
- // command processed by the object. The former is an atomic counter
- // meaning that other threads can increment it safely.
- atomic_counter_t sent_seqnum;
- uint32_t processed_seqnum;
-
- bool terminating;
-
- safe_object_t (const safe_object_t&);
- void operator = (const safe_object_t&);
- };
-
-}
-
-#endif
diff --git a/src/select.cpp b/src/select.cpp
index 68ec9a0..f10acdc 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -51,6 +51,14 @@ zmq::select_t::select_t () :
FD_ZERO (&source_set_err);
}
+zmq::select_t::~select_t ()
+{
+ // Make sure there are no fds registered on shutdown.
+ zmq_assert (load.get () == 0);
+
+ worker.stop ();
+}
+
zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{
// Store the file descriptor.
@@ -156,11 +164,6 @@ void zmq::select_t::stop ()
stopping = true;
}
-void zmq::select_t::join ()
-{
- worker.stop ();
-}
-
void zmq::select_t::loop ()
{
while (!stopping) {
diff --git a/src/select.hpp b/src/select.hpp
index c1e72a7..c442477 100644
--- a/src/select.hpp
+++ b/src/select.hpp
@@ -50,6 +50,7 @@ namespace zmq
public:
select_t ();
+ ~select_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
@@ -63,7 +64,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
- void join ();
private:
diff --git a/src/session.cpp b/src/session.cpp
deleted file mode 100644
index b9a450d..0000000
--- a/src/session.cpp
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "session.hpp"
-#include "i_engine.hpp"
-#include "i_thread.hpp"
-#include "i_mux.hpp"
-#include "i_demux.hpp"
-#include "err.hpp"
-#include "pipe.hpp"
-#include "pipe_reader.hpp"
-#include "pipe_writer.hpp"
-#include "simple_semaphore.hpp"
-
-zmq::session_t::session_t (object_t *parent_, i_thread *thread_,
- i_mux *mux_, i_demux *demux_,
- bool terminate_on_disconnect_, bool terminate_on_no_pipes_) :
- safe_object_t (parent_),
- mux (mux_),
- demux (demux_),
- thread (thread_),
- engine (NULL),
- terminate_on_disconnect (terminate_on_disconnect_),
- terminate_on_no_pipes (false),
- terminate_on_no_pipes_delayed (terminate_on_no_pipes_),
- index (-1)
-{
- // At least one way to terminate the session should be allowed. Otherwise
- // the session can be orphaned forever.
- zmq_assert (terminate_on_disconnect || terminate_on_no_pipes_delayed);
-
- // Give the mux and the demux callback pointer to ourselves.
- if (mux)
- mux->set_session (this);
- if (demux)
- demux->set_session (this);
-}
-
-void zmq::session_t::shutdown ()
-{
- // Session may live even without an associated engine, thus we have
- // to check if for NULL value.
- if (engine)
- engine->shutdown ();
-
- // Propagate the shutdown signal to both inbound and outbound pipes.
- if (mux)
- mux->shutdown ();
- if (demux)
- demux->shutdown ();
-
- delete this;
-}
-
-void zmq::session_t::disconnected ()
-{
- // It's engine who calls this function so there's no need to deallocate
- // the engine. Just drop the reference.
- engine = NULL;
-
- // Some sessions won't shut down because of disconnect. New engine will
- // attached to the session later on.
- if (!terminate_on_disconnect)
- return;
-
- terminate ();
-}
-
-void zmq::session_t::bind (object_t *peer_, bool in_, bool out_)
-{
- // Create the out pipe (if required).
- pipe_reader_t *pipe_reader = NULL;
- if (out_) {
- pipe_writer_t *pipe_writer;
- create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
- demux->attach_pipe (pipe_writer);
-
- // There's at least one pipe attached. We can deallocate the object
- // when there are no pipes (if required).
- terminate_on_no_pipes = terminate_on_no_pipes_delayed;
- }
-
- // Ask peer to attach to the out pipe (if one exists). If required, ask
- // it to create a pipe in opposite direction. It's assumed that peer's
- // seqnum was already incremented, so we don't need to care whether it's
- // alive at the moment.
- if (in_)
- inc_seqnum ();
- send_bind (peer_, pipe_reader, in_ ? this : NULL);
-}
-
-void zmq::session_t::revive ()
-{
- if (engine)
- engine->revive ();
-}
-
-void zmq::session_t::terminate ()
-{
- // Terminate is always called by engine, thus it'll terminate itself,
- // we just have to drop the pointer.
- engine = NULL;
-
- // Propagate the terminate signal to both inbound and outbound pipes.
- if (mux) {
- mux->terminate ();
- mux = NULL;
- }
- if (demux) {
- demux->terminate ();
- demux = NULL;
- }
-
- // Session cannot be deallocated at this point. There can still be
- // pending commands to process. Unregister session from global
- // repository thus ensuring that no new commands will be sent.
- unregister_inproc_endpoints (this);
-
- // Move to terminating state.
- safe_object_t::terminate ();
-}
-
-zmq::session_t::~session_t ()
-{
- // When session is actually deallocated it unregisters from its thread.
- // Unregistration cannot be done earlier as it would result in memory
- // leak if global shutdown happens in the middle of session termination.
- thread->detach_session (this);
-}
-
-void zmq::session_t::set_engine (i_engine *engine_)
-{
- zmq_assert (!engine || !engine_);
- engine = engine_;
-}
-
-void zmq::session_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::session_t::get_index ()
-{
- return index;
-}
-
-bool zmq::session_t::write (zmq_msg *msg_)
-{
- return demux->send (msg_);
-}
-
-void zmq::session_t::flush ()
-{
- demux->flush ();
-}
-
-bool zmq::session_t::read (zmq_msg *msg_)
-{
- bool retrieved = mux->recv (msg_);
- if (terminate_on_no_pipes && mux->empty () && demux->empty ()) {
- zmq_assert (engine);
- engine->schedule_terminate ();
- terminate ();
- }
- return retrieved;
-}
-
-void zmq::session_t::process_bind (pipe_reader_t *reader_, session_t *peer_)
-{
- if (is_terminating ()) {
-
- // If session is already in termination phase, we'll ask newly arrived
- // pipe reader & writer to terminate straight away.
- if (reader_)
- reader_->terminate ();
-
- // Peer session has already incremented its seqnum. We have to send
- // a dummy command to avoid a memory leak.
- if (peer_)
- send_bind (peer_, NULL, NULL);
-
- return;
- }
-
- // If inbound pipe is provided, bind it to the mux.
- if (reader_) {
- mux->attach_pipe (reader_);
-
- // There's at least one pipe attached. We can deallocate the object
- // when there are no pipes (if required).
- terminate_on_no_pipes = terminate_on_no_pipes_delayed;
- }
-
- // If peer wants to get messages from ourselves, we'll bind to it.
- if (peer_) {
- pipe_reader_t *pipe_reader;
- pipe_writer_t *pipe_writer;
- create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
- demux->attach_pipe (pipe_writer);
- send_bind (peer_, pipe_reader, NULL);
-
- // There's at least one pipe attached. We can deallocate the object
- // when there are no pipes (if required).
- terminate_on_no_pipes = terminate_on_no_pipes_delayed;
- }
-}
-
-void zmq::session_t::process_reg (simple_semaphore_t *smph_)
-{
- zmq_assert (!is_terminating ());
-
- // Add the session to the list of sessions associated with this I/O thread.
- // This way the session will be deallocated on the terminal shutdown.
- thread->attach_session (this);
-
- // Release calling thead (if required).
- if (smph_)
- smph_->post ();
-}
-
-void zmq::session_t::process_reg_and_bind (session_t *peer_,
- bool flow_in_, bool flow_out_)
-{
- zmq_assert (!is_terminating ());
-
- // Add the session to the list of sessions associated with this I/O thread.
- // This way the session will be deallocated on the terminal shutdown.
- thread->attach_session (this);
-
- // Bind to the peer. Note that caller have already incremented command
- // sequence number of the peer so we are sure it still exists.
- pipe_reader_t *pipe_reader = NULL;
- if (flow_out_) {
- pipe_writer_t *pipe_writer;
- create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
- demux->attach_pipe (pipe_writer);
-
- // There's at least one pipe attached. We can deallocate the object
- // when there are no pipes (if required).
- terminate_on_no_pipes = terminate_on_no_pipes_delayed;
- }
- send_bind (peer_, pipe_reader, flow_in_ ? this : NULL);
-}
-
-void zmq::session_t::process_engine (i_engine *engine_)
-{
- if (is_terminating ()) {
-
- // Kill the engine. It won't be needed anymore.
- engine_->terminate ();
- return;
- }
-
- engine_->attach (thread->get_poller (), this);
-}
diff --git a/src/session.hpp b/src/session.hpp
deleted file mode 100644
index 855dd1d..0000000
--- a/src/session.hpp
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_SESSION_HPP_INCLUDED__
-#define __ZMQ_SESSION_HPP_INCLUDED__
-
-#include "i_session.hpp"
-#include "safe_object.hpp"
-#include "stdint.hpp"
-#include "atomic_counter.hpp"
-
-namespace zmq
-{
-
- // Object that encapsulates both mux and demux.
-
- class session_t : public safe_object_t, public i_session
- {
- public:
-
- // Creates the session object.
- session_t (struct object_t *parent_, struct i_thread *thread_,
- struct i_mux *mux_, struct i_demux *demux_,
- bool terminate_on_disconnect_, bool terminate_on_no_pipes_);
-
- // i_session implementation
- void set_engine (struct i_engine *engine_);
- void shutdown ();
- bool read (struct zmq_msg *msg_);
- bool write (struct zmq_msg *msg_);
- void flush ();
-
- // Called by the engine when it is being closed.
- void disconnected ();
-
- // Creates a message flow between this session and the peer session.
- // If in_ is true, the messages can flow from the peer to ourselves.
- // If out_ is true, messages can flow from ourselves to the peer.
- // It's assumed that peer's seqnum was already incremented.
- void bind (class object_t *peer_, bool in_, bool out_);
-
- // Called by mux if new messages are available.
- void revive ();
-
- // Functions to set & retrieve index of this MD in thread's array
- // of session objects.
- void set_index (int index_);
- int get_index ();
-
- private:
-
- // Clean-up.
- ~session_t ();
-
- // Terminate is private here. It is called by either when disconnected
- // or no_pipes event occurs.
- void terminate ();
-
- void process_bind (class pipe_reader_t *reader_,
- class session_t *peer_);
- void process_reg (class simple_semaphore_t *smph_);
- void process_reg_and_bind (class session_t *peer_,
- bool flow_in_, bool flow_out_);
- void process_engine (i_engine *engine_);
-
- struct i_mux *mux;
- struct i_demux *demux;
-
- struct i_thread *thread;
- struct i_engine *engine;
-
- // If true termination of the session can be triggered by engine
- // disconnect/close.
- bool terminate_on_disconnect;
-
- // If true termination of the session can be triggered when the last
- // pipe detaches from it.
- bool terminate_on_no_pipes;
-
- // If true, terminate_on_no_pipes should be set when at least one
- // pipe was bound.
- bool terminate_on_no_pipes_delayed;
-
- // Index in thread's session array.
- int index;
- };
-
-}
-
-#endif
-
diff --git a/src/session_stub.cpp b/src/session_stub.cpp
deleted file mode 100644
index 152b9fb..0000000
--- a/src/session_stub.cpp
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <string>
-
-#include "../include/zmq.h"
-
-#include "session_stub.hpp"
-#include "i_engine.hpp"
-#include "listener.hpp"
-#include "err.hpp"
-
-zmq::session_stub_t::session_stub_t (listener_t *listener_) :
- state (reading_identity),
- engine (NULL),
- listener (listener_),
- index (-1)
-{
-}
-
-void zmq::session_stub_t::terminate ()
-{
- if (engine)
- engine->terminate ();
- delete this;
-}
-
-void zmq::session_stub_t::shutdown ()
-{
- if (engine)
- engine->shutdown ();
- delete this;
-}
-
-zmq::session_stub_t::~session_stub_t ()
-{
-}
-
-void zmq::session_stub_t::set_engine (i_engine *engine_)
-{
- zmq_assert (!engine_ || !engine);
- engine = engine_;
-}
-
-bool zmq::session_stub_t::read (struct zmq_msg *msg_)
-{
- // No messages are sent to the connecting peer.
- return false;
-}
-
-bool zmq::session_stub_t::write (struct zmq_msg *msg_)
-{
- // The first message arrived is the connection identity.
- if (state == reading_identity) {
- identity = std::string ((const char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
- state = has_identity;
- return true;
- }
-
- // We are not interested in any subsequent messages.
- return false;
-}
-
-void zmq::session_stub_t::flush ()
-{
- // We have the identity. At this point we can find the correct session and
- // attach it to the connection.
- if (state == has_identity) {
-
- // At this point the stub will be deleted. Return immediately without
- // touching 'this' pointer.
- listener->got_identity (this, identity.c_str ());
- return;
- }
-}
-
-zmq::i_engine *zmq::session_stub_t::detach_engine ()
-{
- // Ask engine to unregister from the poller.
- i_engine *e = engine;
- engine->detach ();
- return e;
-}
-
-void zmq::session_stub_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::session_stub_t::get_index ()
-{
- return index;
-}
diff --git a/src/session_stub.hpp b/src/session_stub.hpp
deleted file mode 100644
index 4499e45..0000000
--- a/src/session_stub.hpp
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_SESSION_STUB_HPP_INCLUDED__
-#define __ZMQ_SESSION_STUB_HPP_INCLUDED__
-
-#include <string>
-
-#include "i_session.hpp"
-
-namespace zmq
-{
-
- // This class is used instead of regular session till the identity of
- // incomming connection is established and connection is attached
- // to corresponding session.
-
- class session_stub_t : public i_session
- {
- public:
-
- session_stub_t (class listener_t *listener_);
-
- // i_session implementation.
- void set_engine (struct i_engine *engine_);
- void terminate ();
- void shutdown ();
- bool read (struct zmq_msg *msg_);
- bool write (struct zmq_msg *msg_);
- void flush ();
-
- // Detaches engine from the stub. Returns it to the caller.
- struct i_engine *detach_engine ();
-
- // Manipulate stubs's index in listener's array of stubs.
- void set_index (int index_);
- int get_index ();
-
- private:
-
- // Clean-up.
- virtual ~session_stub_t ();
-
- enum {
- reading_identity,
- has_identity
- } state;
-
- // Reference to the associated engine.
- i_engine *engine;
-
- // Reference to the listener object that owns this stub.
- class listener_t *listener;
-
- // Index of the stub in listener's array of stubs.
- int index;
-
- // Identity of the connection.
- std::string identity;
-
- session_stub_t (const session_stub_t&);
- void operator = (const session_stub_t&);
- };
-
-}
-
-#endif
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
deleted file mode 100644
index 6718244..0000000
--- a/src/socket_base.cpp
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <string>
-
-#include "../include/zmq.h"
-
-#include "socket_base.hpp"
-#include "app_thread.hpp"
-#include "err.hpp"
-#include "listener.hpp"
-#include "connecter.hpp"
-#include "simple_semaphore.hpp"
-#include "io_thread.hpp"
-#include "io_object.hpp"
-#include "session.hpp"
-#include "dummy_aggregator.hpp"
-#include "dummy_distributor.hpp"
-
-zmq::socket_base_t::socket_base_t (app_thread_t *thread_, session_t *session_) :
- object_t (thread_),
- thread (thread_),
- session (session_),
- has_in (true),
- has_out (true)
-{
- session->set_engine (this);
-}
-
-void zmq::socket_base_t::shutdown ()
-{
- // Destroy all the I/O objects created from this socket.
- for (io_objects_t::size_type i = 0; i != io_objects.size (); i++)
- io_objects [i]->shutdown ();
-
- delete this;
-}
-
-void zmq::socket_base_t::schedule_terminate ()
-{
- // Terminate is never scheduled on socket engines.
- zmq_assert (false);
-}
-
-void zmq::socket_base_t::terminate ()
-{
- // Destroy all the I/O objects created from this socket.
- // First unregister the object from I/O thread, then terminate it in
- // this application thread.
- simple_semaphore_t smph;
- for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) {
- send_unreg (io_objects [i], &smph);
- smph.wait ();
- io_objects [i]->terminate ();
- }
-
- zmq_assert (session);
- session->disconnected ();
-
- delete this;
-}
-
-zmq::socket_base_t::~socket_base_t ()
-{
-}
-
-void zmq::socket_base_t::disable_in ()
-{
- has_in = false;
-}
-
-void zmq::socket_base_t::disable_out ()
-{
- has_out = false;
-}
-
-int zmq::socket_base_t::bind (const char *addr_, zmq_opts *opts_)
-{
- thread->process_commands (false);
-
- std::string addr (addr_);
- std::string::size_type pos = addr.find ("://");
- if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") {
-
- // Choose the I/O thread with the least load, create the listener.
- // Note that same taskset is used to choose the I/O thread to handle
- // the listening socket and newly created connections.
- // Note that has_in and has_out are twisted at this place - listener
- // is going to create peer objects, so the message flows are viewed
- // from the opposite direction.
- io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0);
- listener_t *listener = new listener_t (io_thread, addr_, session,
- has_out, has_in, opts_ ? opts_->taskset : 0);
-
- // Ask it to start interacting with the I/O thread.
- simple_semaphore_t smph;
- send_reg (listener, &smph);
-
- // Store the reference to the listener so that it can be terminated
- // when the socket is closed.
- io_objects.push_back (listener);
-
- // Wait while listener is actually registered with the I/O thread.
- smph.wait ();
-
- return 0;
- }
- else if (addr.substr (0, pos) == "inproc") {
-
- // For inproc transport the only thing we have to do is to register
- // this socket as an inproc endpoint with the supplied name.
- return register_inproc_endpoint (addr.substr (pos + 3).c_str (),
- session);
- }
- else {
-
- // Unknown protocol requested.
- errno = EINVAL;
- return -1;
- }
-}
-
-int zmq::socket_base_t::connect (const char *addr_, zmq_opts *opts_)
-{
- thread->process_commands (false);
-
- std::string addr (addr_);
- std::string::size_type pos = addr.find ("://");
- if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") {
-
- // Choose the I/O thread with the least load, create the connecter and
- // session.
- io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0);
- i_mux *mux = new dummy_aggregator_t;
- zmq_assert (mux);
- i_demux *demux = new dummy_distributor_t;
- zmq_assert (demux);
- session_t *peer = new session_t (io_thread, io_thread, mux, demux,
- false, true);
- zmq_assert (peer);
- connecter_t *connecter = new connecter_t (io_thread, addr_, peer);
- zmq_assert (connecter);
-
- // Increment session's command sequence number so that it won't get
- // deallocated till the subsequent bind command arrives.
- peer->inc_seqnum ();
-
- // Register the connecter (and session) with its I/O thread.
- simple_semaphore_t smph;
- send_reg (connecter, &smph);
-
- // Store the reference to the connecter so that it can be terminated
- // when the socket is closed.
- io_objects.push_back (connecter);
-
- // Wait till registration succeeds.
- smph.wait ();
-
- // Bind local session with the connecter's session so that messages
- // can flow in both directions.
- session->bind (peer, has_in, has_out);
-
- return 0;
- }
- else if (addr.substr (0, pos) == "inproc") {
-
- // Get the MD responsible for the address. In case of invalid address
- // return error.
- object_t *peer = get_inproc_endpoint (addr.substr (pos + 3).c_str ());
- if (!peer) {
- errno = EADDRNOTAVAIL;
- return -1;
- }
-
- // Create bidirectional message pipes between this session and
- // the peer session.
- session->bind (peer, has_in, has_out);
-
- return 0;
- }
- else {
-
- // Unknown protocol requested.
- errno = EINVAL;
- return -1;
- }
-}
-
-int zmq::socket_base_t::subscribe (const char *criteria_)
-{
- // No implementation at the moment...
- errno = ENOTSUP;
- return -1;
-}
-
-int zmq::socket_base_t::send (zmq_msg *msg_, int flags_)
-{
- thread->process_commands (false);
- while (true) {
- if (session->write (msg_))
- return 0;
- if (flags_ & ZMQ_NOBLOCK) {
- errno = EAGAIN;
- return -1;
- }
- thread->process_commands (true);
- }
-}
-
-int zmq::socket_base_t::flush ()
-{
- thread->process_commands (false);
- session->flush ();
- return 0;
-}
-
-int zmq::socket_base_t::recv (zmq_msg *msg_, int flags_)
-{
- thread->process_commands (false);
- while (true) {
- if (session->read (msg_))
- return 0;
- if (flags_ & ZMQ_NOBLOCK) {
- errno = EAGAIN;
- return -1;
- }
- thread->process_commands (true);
- }
-}
-
-int zmq::socket_base_t::close ()
-{
- terminate ();
- return 0;
-}
-
-void zmq::socket_base_t::attach (struct i_poller *poller_, i_session *session_)
-{
- zmq_assert (false);
-}
-
-void zmq::socket_base_t::detach ()
-{
- zmq_assert (false);
-}
-
-void zmq::socket_base_t::revive ()
-{
- // We can ignore the event safely here.
-}
-
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
deleted file mode 100644
index c1de8e6..0000000
--- a/src/socket_base.hpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
-#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
-
-#include <vector>
-
-#include "i_engine.hpp"
-#include "i_api.hpp"
-#include "object.hpp"
-
-namespace zmq
-{
-
- class socket_base_t : public object_t, public i_engine, public i_api
- {
- public:
-
- // TODO: Possibly, session can be attached to the engine using
- // attach function.
- socket_base_t (class app_thread_t *thread_, class session_t *session_);
-
- // i_engine interface implementation.
- void attach (struct i_poller *poller_, struct i_session *session_);
- void detach ();
- void revive ();
- void schedule_terminate ();
- void terminate ();
- void shutdown ();
-
- // i_api interface implementation.
- int bind (const char *addr_, struct zmq_opts *opts_);
- int connect (const char *addr_, struct zmq_opts *opts_);
- int subscribe (const char *criteria_);
- int send (struct zmq_msg *msg_, int flags_);
- int flush ();
- int recv (struct zmq_msg *msg_, int flags_);
- int close ();
-
- protected:
-
- // Clean-up. The function has to be protected rather than private,
- // otherwise auto-generated destructor in derived classes
- // cannot be compiled. It has to be virtual so that socket_base_t's
- // terminate & shutdown functions deallocate correct type of object.
- virtual ~socket_base_t ();
-
- // By default, socket is able to pass messages in both inward and
- // outward directions. By calling these functions, particular
- // socket type is able to eliminate one direction.
- void disable_in ();
- void disable_out ();
-
- private:
-
- // Pointer to the application thread the socket belongs to.
- class app_thread_t *thread;
-
- // Pointer to the associated session object.
- class session_t *session;
-
- // List of I/O object created via this socket. These have to be shut
- // down when the socket is closed.
- typedef std::vector <class io_object_t*> io_objects_t;
- io_objects_t io_objects;
-
- // If true, socket creates inbound pipe when binding to an engine.
- bool has_in;
-
- // If true, socket creates outbound pipe when binding to an engine.
- bool has_out;
-
- socket_base_t (const socket_base_t&);
- void operator = (const socket_base_t&);
- };
-
-}
-
-#endif
diff --git a/src/sub.cpp b/src/sub.cpp
deleted file mode 100644
index 3d1d578..0000000
--- a/src/sub.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "../include/zmq.h"
-
-#include "sub.hpp"
-#include "app_thread.hpp"
-#include "session.hpp"
-#include "err.hpp"
-
-zmq::sub_t::sub_t (app_thread_t *thread_, session_t *session_) :
- socket_base_t (thread_, session_)
-{
- disable_out ();
-}
-
-int zmq::sub_t::send (struct zmq_msg *msg_, int flags_)
-{
- // Subscriber socket has no send function.
- errno = ENOTSUP;
- return -1;
-}
-
-int zmq::sub_t::flush ()
-{
- // Subscriber socket has no flush function.
- errno = ENOTSUP;
- return -1;
-}
diff --git a/src/sub.hpp b/src/sub.hpp
deleted file mode 100644
index f3e23c1..0000000
--- a/src/sub.hpp
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_SUB_HPP_INCLUDED__
-#define __ZMQ_SUB_HPP_INCLUDED__
-
-#include "socket_base.hpp"
-
-namespace zmq
-{
-
- class sub_t : public socket_base_t
- {
- public:
-
- sub_t (class app_thread_t *thread_, class session_t *session_);
-
- // i_api overloads.
- int send (struct zmq_msg *msg_, int flags_);
- int flush ();
-
- private:
-
- sub_t (const sub_t&);
- void operator = (const sub_t&);
- };
-
-}
-
-#endif
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 0fb6fe1..d19b229 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -176,7 +176,7 @@ void *zmq_init (int app_threads_, int io_threads_)
int zmq_term (void *context_)
{
- ((zmq::context_t*) context_)->shutdown ();
+ delete (zmq::context_t*) context_;
return 0;
}
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
deleted file mode 100644
index 0c491ea..0000000
--- a/src/zmq_decoder.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "zmq_decoder.hpp"
-#include "i_session.hpp"
-#include "wire.hpp"
-
-zmq::zmq_decoder_t::zmq_decoder_t () :
- destination (NULL)
-{
- zmq_msg_init (&in_progress);
-
- // At the beginning, read one byte and go to one_byte_size_ready state.
- next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
-}
-
-zmq::zmq_decoder_t::~zmq_decoder_t ()
-{
- zmq_msg_close (&in_progress);
-}
-
-void zmq::zmq_decoder_t::set_session (i_session *destination_)
-{
- destination = destination_;
-}
-
-bool zmq::zmq_decoder_t::one_byte_size_ready ()
-{
- // First byte of size is read. If it is 0xff read 8-byte size.
- // Otherwise allocate the buffer for message data and read the
- // message data into it.
- if (*tmpbuf == 0xff)
- next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready);
- else {
- zmq_msg_init_size (&in_progress, *tmpbuf);
- next_step (zmq_msg_data (&in_progress), *tmpbuf,
- &zmq_decoder_t::message_ready);
- }
- return true;
-}
-
-bool zmq::zmq_decoder_t::eight_byte_size_ready ()
-{
- // 8-byte size is read. Allocate the buffer for message body and
- // read the message data into it.
- size_t size = (size_t) get_uint64 (tmpbuf);
- zmq_msg_init_size (&in_progress, size);
- next_step (zmq_msg_data (&in_progress), size,
- &zmq_decoder_t::message_ready);
- return true;
-}
-
-bool zmq::zmq_decoder_t::message_ready ()
-{
- // Message is completely read. Push it further and start reading
- // new message.
- if (!destination->write (&in_progress))
- return false;
-
- next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
- return true;
-}
-
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
deleted file mode 100644
index f648819..0000000
--- a/src/zmq_decoder.hpp
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-
-#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
-#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
-
-#include "../include/zmq.h"
-
-#include "decoder.hpp"
-
-namespace zmq
-{
- // Decoder for 0MQ backend protocol. Converts data batches into messages.
-
- class zmq_decoder_t : public decoder_t <zmq_decoder_t>
- {
- public:
-
- zmq_decoder_t ();
- ~zmq_decoder_t ();
-
- void set_session (struct i_session *destination_);
-
- private:
-
- bool one_byte_size_ready ();
- bool eight_byte_size_ready ();
- bool message_ready ();
-
- struct i_session *destination;
- unsigned char tmpbuf [8];
- ::zmq_msg in_progress;
-
- zmq_decoder_t (const zmq_decoder_t&);
- void operator = (const zmq_decoder_t&);
- };
-
-}
-
-#endif
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
deleted file mode 100644
index 8a713cf..0000000
--- a/src/zmq_encoder.cpp
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "zmq_encoder.hpp"
-#include "i_session.hpp"
-#include "wire.hpp"
-
-zmq::zmq_encoder_t::zmq_encoder_t () :
- source (NULL)
-{
- zmq_msg_init (&in_progress);
-
- // Write 0 bytes to the batch and go to message_ready state.
- next_step (NULL, 0, &zmq_encoder_t::message_ready, true);
-}
-
-zmq::zmq_encoder_t::~zmq_encoder_t ()
-{
- zmq_msg_close (&in_progress);
-}
-
-void zmq::zmq_encoder_t::set_session (i_session *source_)
-{
- source = source_;
-}
-
-bool zmq::zmq_encoder_t::size_ready ()
-{
- // Write message body into the buffer.
- next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
- &zmq_encoder_t::message_ready, false);
- return true;
-}
-
-bool zmq::zmq_encoder_t::message_ready ()
-{
- // Read new message from the dispatcher. If there is none, return false.
- // Note that new state is set only if write is successful. That way
- // unsuccessful write will cause retry on the next state machine
- // invocation.
- if (!source->read (&in_progress)) {
- return false;
- }
- size_t size = zmq_msg_size (&in_progress);
-
- // For messages less than 255 bytes long, write one byte of message size.
- // For longer messages write 0xff escape character followed by 8-byte
- // message size.
- if (size < 255) {
- tmpbuf [0] = (unsigned char) size;
- next_step (tmpbuf, 1, &zmq_encoder_t::size_ready, true);
- }
- else {
- tmpbuf [0] = 0xff;
- put_uint64 (tmpbuf + 1, size);
- next_step (tmpbuf, 9, &zmq_encoder_t::size_ready, true);
- }
- return true;
-}
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
deleted file mode 100644
index 829fd4b..0000000
--- a/src/zmq_encoder.hpp
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__
-#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__
-
-#include "../include/zmq.h"
-
-#include "encoder.hpp"
-
-namespace zmq
-{
- // Encoder for 0MQ backend protocol. Converts messages into data batches.
-
- class zmq_encoder_t : public encoder_t <zmq_encoder_t>
- {
- public:
-
- zmq_encoder_t ();
- ~zmq_encoder_t ();
-
- void set_session (struct i_session *source_);
-
- private:
-
- bool size_ready ();
- bool message_ready ();
-
- struct i_session *source;
- ::zmq_msg in_progress;
- unsigned char tmpbuf [9];
-
- zmq_encoder_t (const zmq_encoder_t&);
- void operator = (const zmq_encoder_t&);
- };
-}
-
-#endif
diff --git a/src/zmq_tcp_engine.cpp b/src/zmq_tcp_engine.cpp
deleted file mode 100644
index 6091d80..0000000
--- a/src/zmq_tcp_engine.cpp
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "zmq_tcp_engine.hpp"
-#include "config.hpp"
-#include "i_session.hpp"
-#include "err.hpp"
-
-zmq::zmq_tcp_engine_t::zmq_tcp_engine_t (fd_t fd_) :
- poller (NULL),
- session (NULL),
- terminating (false),
- insize (0),
- inpos (0),
- outsize (0),
- outpos (0)
-{
- // Allocate read & write buffer.
- inbuf = new unsigned char [in_batch_size];
- zmq_assert (inbuf);
- outbuf = new unsigned char [out_batch_size];
- zmq_assert (outbuf);
-
- // Attach the socket.
- int rc = socket.open (fd_);
- zmq_assert (rc == 0);
-}
-
-void zmq::zmq_tcp_engine_t::attach (i_poller *poller_, i_session *session_)
-{
- zmq_assert (!poller);
- poller = poller_;
- zmq_assert (!session);
- session = session_;
- encoder.set_session (session);
- decoder.set_session (session);
-
- // Let session know we are here.
- session->set_engine (this);
-
- // Register the engine with the polling thread.
- handle = poller->add_fd (socket.get_fd (), this);
- poller->set_pollin (handle);
- poller->set_pollout (handle);
-
- // Flush any pending inbound messages.
- in_event ();
-}
-
-void zmq::zmq_tcp_engine_t::detach ()
-{
- zmq_assert (poller);
- poller->rm_fd (handle);
- poller = NULL;
- zmq_assert (session);
- session->set_engine (NULL);
- session = NULL;
- encoder.set_session (NULL);
- decoder.set_session (NULL);
-}
-
-void zmq::zmq_tcp_engine_t::revive ()
-{
- zmq_assert (poller);
- poller->set_pollout (handle);
-}
-
-void zmq::zmq_tcp_engine_t::schedule_terminate ()
-{
- terminating = true;
-}
-
-void zmq::zmq_tcp_engine_t::terminate ()
-{
- delete this;
-}
-
-void zmq::zmq_tcp_engine_t::shutdown ()
-{
- delete this;
-}
-
-zmq::zmq_tcp_engine_t::~zmq_tcp_engine_t ()
-{
- detach ();
- delete [] outbuf;
- delete [] inbuf;
-}
-
-void zmq::zmq_tcp_engine_t::in_event ()
-{
- // If there's no data to process in the buffer, read new data.
- if (inpos == insize) {
-
- // Read as much data as possible to the read buffer.
- insize = socket.read (inbuf, in_batch_size);
- inpos = 0;
-
- // Check whether the peer has closed the connection.
- if (insize == -1) {
- insize = 0;
- error ();
- return;
- }
- }
-
- // Following code should be executed even if there's not a single byte in
- // the buffer. There still can be a decoded messages stored in the decoder.
-
- // Push the data to the decoder.
- int nbytes = decoder.write (inbuf + inpos, insize - inpos);
-
- // Adjust read position. Stop polling for input if we got stuck.
- inpos += nbytes;
- if (inpos < insize)
- poller->reset_pollin (handle);
-
- // If at least one byte was processed, flush all messages the decoder
- // may have produced. If engine is disconnected from session, no need
- // to flush the messages. It's important that flush is called at the
- // very end of in_event as it may invoke in_event itself.
- if (nbytes > 0 && session)
- session->flush ();
-}
-
-void zmq::zmq_tcp_engine_t::out_event ()
-{
- // If write buffer is empty, try to read new data from the encoder.
- if (outpos == outsize) {
-
- outsize = encoder.read (outbuf, out_batch_size);
- outpos = 0;
-
- // If there are no more pipes, engine can be deallocated.
- if (terminating) {
- terminate ();
- return;
- }
-
- // If there is no data to send, stop polling for output.
- if (outsize == 0)
- poller->reset_pollout (handle);
- }
-
- // If there are any data to write in write buffer, write as much as
- // possible to the socket.
- if (outpos < outsize) {
- int nbytes = socket.write (outbuf + outpos, outsize - outpos);
-
- // Handle problems with the connection.
- if (nbytes == -1) {
- error ();
- return;
- }
-
- outpos += nbytes;
- }
-}
-
-void zmq::zmq_tcp_engine_t::timer_event ()
-{
- zmq_assert (false);
-}
-
-void zmq::zmq_tcp_engine_t::error ()
-{
- zmq_assert (false);
-}
-
diff --git a/src/zmq_tcp_engine.hpp b/src/zmq_tcp_engine.hpp
deleted file mode 100644
index 6a83cec..0000000
--- a/src/zmq_tcp_engine.hpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ZMQ_TCP_ENGINE_HPP_INCLUDED__
-#define __ZMQ_ZMQ_TCP_ENGINE_HPP_INCLUDED__
-
-#include "i_engine.hpp"
-#include "i_poller.hpp"
-#include "i_poll_events.hpp"
-#include "fd.hpp"
-#include "tcp_socket.hpp"
-#include "zmq_encoder.hpp"
-#include "zmq_decoder.hpp"
-
-namespace zmq
-{
-
- class zmq_tcp_engine_t : public i_engine, public i_poll_events
- {
- public:
-
- zmq_tcp_engine_t (fd_t fd_);
-
- // i_engine implementation.
- void attach (struct i_poller *poller_, struct i_session *session_);
- void detach ();
- void revive ();
- void schedule_terminate ();
- void terminate ();
- void shutdown ();
-
- // i_poll_events implementation.
- void in_event ();
- void out_event ();
- void timer_event ();
-
- private:
-
- void error ();
-
- // Clean-up.
- ~zmq_tcp_engine_t ();
-
- // The underlying TCP socket.
- tcp_socket_t socket;
-
- // Handle associated with the socket.
- handle_t handle;
-
- // I/O thread that the engine runs in.
- i_poller *poller;
-
- // Reference to the associated session object.
- i_session *session;
-
- // If true, engine should terminate itself as soon as possible.
- bool terminating;
-
- unsigned char *inbuf;
- int insize;
- int inpos;
-
- unsigned char *outbuf;
- int outsize;
- int outpos;
-
- zmq_encoder_t encoder;
- zmq_decoder_t decoder;
-
- zmq_tcp_engine_t (const zmq_tcp_engine_t&);
- void operator = (const zmq_tcp_engine_t&);
- };
-
-}
-
-#endif