summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-06 12:51:32 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-06 12:51:32 +0200
commit0b5cc026fbe7ccc6de66907be29471562a2d344d (patch)
treea6051f238152e2261ea48942f0c216a3984cc9fd /src
parentb8b4acef4c2ba1a169ce84c1fb4c70a5676ebba3 (diff)
clean up - session/socket/engine stuff removed
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am51
-rw-r--r--src/app_thread.cpp106
-rw-r--r--src/app_thread.hpp27
-rw-r--r--src/connecter.cpp189
-rw-r--r--src/connecter.hpp99
-rw-r--r--src/context.cpp139
-rw-r--r--src/context.hpp52
-rw-r--r--src/data_distributor.cpp155
-rw-r--r--src/data_distributor.hpp70
-rw-r--r--src/devpoll.cpp9
-rw-r--r--src/devpoll.hpp3
-rw-r--r--src/dummy_aggregator.cpp111
-rw-r--r--src/dummy_aggregator.hpp73
-rw-r--r--src/dummy_distributor.cpp85
-rw-r--r--src/dummy_distributor.hpp68
-rw-r--r--src/epoll.cpp10
-rw-r--r--src/epoll.hpp3
-rw-r--r--src/fair_aggregator.cpp143
-rw-r--r--src/fair_aggregator.hpp77
-rw-r--r--src/i_api.hpp42
-rw-r--r--src/i_demux.hpp57
-rw-r--r--src/i_engine.hpp53
-rw-r--r--src/i_mux.hpp60
-rw-r--r--src/i_poller.hpp7
-rw-r--r--src/i_session.hpp37
-rw-r--r--src/i_socket.hpp (renamed from src/req.cpp)21
-rw-r--r--src/i_thread.hpp38
-rw-r--r--src/io_object.cpp37
-rw-r--r--src/io_object.hpp51
-rw-r--r--src/io_thread.cpp31
-rw-r--r--src/io_thread.hpp27
-rw-r--r--src/kqueue.cpp9
-rw-r--r--src/kqueue.hpp3
-rw-r--r--src/listener.cpp170
-rw-r--r--src/listener.hpp110
-rw-r--r--src/load_balancer.cpp130
-rw-r--r--src/load_balancer.hpp73
-rw-r--r--src/object.cpp33
-rw-r--r--src/object.hpp8
-rw-r--r--src/p2p.cpp29
-rw-r--r--src/p2p.hpp42
-rw-r--r--src/pipe.cpp47
-rw-r--r--src/pipe.hpp23
-rw-r--r--src/pipe_reader.cpp118
-rw-r--r--src/pipe_reader.hpp89
-rw-r--r--src/pipe_writer.cpp120
-rw-r--r--src/pipe_writer.hpp88
-rw-r--r--src/poll.cpp13
-rw-r--r--src/poll.hpp3
-rw-r--r--src/pub.cpp38
-rw-r--r--src/pub.hpp45
-rw-r--r--src/rep.cpp29
-rw-r--r--src/rep.hpp42
-rw-r--r--src/req.hpp42
-rw-r--r--src/safe_object.cpp76
-rw-r--r--src/safe_object.hpp68
-rw-r--r--src/select.cpp13
-rw-r--r--src/select.hpp2
-rw-r--r--src/session.cpp273
-rw-r--r--src/session.hpp107
-rw-r--r--src/session_stub.cpp110
-rw-r--r--src/session_stub.hpp83
-rw-r--r--src/socket_base.cpp267
-rw-r--r--src/socket_base.hpp96
-rw-r--r--src/sub.cpp45
-rw-r--r--src/sub.hpp46
-rw-r--r--src/zmq.cpp2
-rw-r--r--src/zmq_decoder.cpp79
-rw-r--r--src/zmq_decoder.hpp57
-rw-r--r--src/zmq_encoder.cpp75
-rw-r--r--src/zmq_encoder.hpp54
-rw-r--r--src/zmq_tcp_engine.cpp185
-rw-r--r--src/zmq_tcp_engine.hpp92
73 files changed, 109 insertions, 4856 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 27f4412..bde9c39 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -7,53 +7,30 @@ libzmq_la_SOURCES = \
atomic_ptr.hpp \
command.hpp \
config.hpp \
- connecter.hpp \
context.hpp \
- data_distributor.hpp \
decoder.hpp \
devpoll.hpp \
- dummy_aggregator.hpp \
- dummy_distributor.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
- fair_aggregator.hpp \
fd.hpp \
fd_signaler.hpp \
- io_object.hpp \
io_thread.hpp \
ip.hpp \
i_api.hpp \
- i_demux.hpp \
- i_mux.hpp \
i_poller.hpp \
i_poll_events.hpp \
- i_session.hpp \
i_signaler.hpp \
- i_engine.hpp \
- i_thread.hpp \
- listener.hpp \
+ i_socket.hpp \
kqueue.hpp \
- load_balancer.hpp \
msg.hpp \
mutex.hpp \
object.hpp \
- p2p.hpp \
pipe.hpp \
- pipe_reader.hpp \
- pipe_writer.hpp \
platform.hpp \
poll.hpp \
- pub.hpp \
- rep.hpp \
- req.hpp \
- safe_object.hpp \
select.hpp \
- session.hpp \
- session_stub.hpp \
simple_semaphore.hpp \
- socket_base.hpp \
- sub.hpp \
stdint.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
@@ -65,50 +42,24 @@ libzmq_la_SOURCES = \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
- zmq_decoder.hpp \
- zmq_encoder.hpp \
- zmq_tcp_engine.hpp \
app_thread.cpp \
- connecter.cpp \
context.cpp \
- data_distributor.cpp \
devpoll.hpp \
- dummy_aggregator.cpp \
- dummy_distributor.cpp \
epoll.cpp \
err.cpp \
- fair_aggregator.cpp \
fd_signaler.cpp \
- io_object.cpp \
io_thread.cpp \
ip.cpp \
kqueue.cpp \
- listener.cpp \
- load_balancer.cpp \
object.cpp \
- p2p.cpp \
- pipe.cpp \
- pipe_reader.cpp \
- pipe_writer.cpp \
poll.cpp \
- pub.cpp \
- rep.cpp \
- req.cpp \
- safe_object.cpp \
select.cpp \
- session.cpp \
- session_stub.cpp \
- socket_base.cpp \
- sub.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
uuid.cpp \
ypollset.cpp \
- zmq_decoder.cpp \
- zmq_encoder.cpp \
- zmq_tcp_engine.cpp \
zmq.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 9cc61c7..23a055a 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -28,20 +28,8 @@
#include "app_thread.hpp"
#include "context.hpp"
#include "err.hpp"
-#include "session.hpp"
#include "pipe.hpp"
#include "config.hpp"
-#include "i_api.hpp"
-#include "dummy_aggregator.hpp"
-#include "fair_aggregator.hpp"
-#include "dummy_distributor.hpp"
-#include "data_distributor.hpp"
-#include "load_balancer.hpp"
-#include "p2p.hpp"
-#include "pub.hpp"
-#include "sub.hpp"
-#include "req.hpp"
-#include "rep.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
{
}
-void zmq::app_thread_t::shutdown ()
-{
- // Deallocate all the sessions associated with the thread.
- while (!sessions.empty ())
- sessions [0]->shutdown ();
-
- delete this;
-}
-
zmq::app_thread_t::~app_thread_t ()
{
-}
+ // Ask all the sockets to start termination, then wait till it is complete.
+ for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
+ (*it)->stop ();
+ for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
+ delete *it;
-void zmq::app_thread_t::attach_session (session_t *session_)
-{
- session_->set_index (sessions.size ());
- sessions.push_back (session_);
-}
-
-void zmq::app_thread_t::detach_session (session_t *session_)
-{
- // O(1) removal of the session from the list.
- sessions_t::size_type i = session_->get_index ();
- sessions [i] = sessions [sessions.size () - 1];
- sessions [i]->set_index (i);
- sessions.pop_back ();
-}
-
-zmq::i_poller *zmq::app_thread_t::get_poller ()
-{
- zmq_assert (false);
+ delete this;
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
@@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler ()
bool zmq::app_thread_t::is_current ()
{
- return !sessions.empty () && tid == getpid ();
+ return !sockets.empty () && tid == getpid ();
}
bool zmq::app_thread_t::make_current ()
{
// If there are object managed by this slot we cannot assign the slot
// to a different thread.
- if (!sessions.empty ())
+ if (!sockets.empty ())
return false;
tid = getpid ();
return true;
}
-zmq::i_api *zmq::app_thread_t::create_socket (int type_)
-{
- i_mux *mux = NULL;
- i_demux *demux = NULL;
- session_t *session = NULL;
- i_api *api = NULL;
-
- switch (type_) {
- case ZMQ_P2P:
- mux = new dummy_aggregator_t;
- zmq_assert (mux);
- demux = new dummy_distributor_t;
- zmq_assert (demux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new p2p_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_PUB:
- demux = new data_distributor_t;
- zmq_assert (demux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new pub_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_SUB:
- mux = new fair_aggregator_t;
- zmq_assert (mux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new sub_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_REQ:
- // TODO
- zmq_assert (false);
- api = new req_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_REP:
- // TODO
- zmq_assert (false);
- api = new rep_t (this, session);
- zmq_assert (api);
- break;
- default:
- errno = EINVAL;
- return NULL;
- }
-
- attach_session (session);
-
- return api;
-}
-
void zmq::app_thread_t::process_commands (bool block_)
{
ypollset_t::signals_t signals;
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 8295c2f..31679b8 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -22,7 +22,7 @@
#include <vector>
-#include "i_thread.hpp"
+#include "i_socket.hpp"
#include "stdint.hpp"
#include "object.hpp"
#include "ypollset.hpp"
@@ -30,23 +30,18 @@
namespace zmq
{
- class app_thread_t : public object_t, public i_thread
+ class app_thread_t : public object_t
{
public:
app_thread_t (class context_t *context_, int thread_slot_);
- // To be called when the whole infrastrucure is being closed.
- void shutdown ();
+ ~app_thread_t ();
// Returns signaler associated with this application thread.
i_signaler *get_signaler ();
- // Create socket engine in this thread. Return false if the calling
- // thread doesn't match the thread handled by this app thread object.
- struct i_api *create_socket (int type_);
-
- // Nota bene: The following two functions are accessed from different
+ // Nota bene: Following two functions are accessed from different
// threads. The caller (context) is responsible for synchronisation
// of accesses.
@@ -61,25 +56,17 @@ namespace zmq
// set to true, returns only after at least one command was processed.
void process_commands (bool block_);
- // i_thread implementation.
- void attach_session (class session_t *session_);
- void detach_session (class session_t *session_);
- struct i_poller *get_poller ();
-
private:
- // Clean-up.
- ~app_thread_t ();
+ // All the sockets created from this application thread.
+ typedef std::vector <i_socket*> sockets_t;
+ sockets_t sockets;
// Thread ID associated with this slot.
// TODO: Virtualise pid_t!
// TODO: Check whether getpid returns unique ID for each thread.
int tid;
- // Vector of all sessionss associated with this app thread.
- typedef std::vector <class session_t*> sessions_t;
- sessions_t sessions;
-
// App thread's signaler object.
ypollset_t pollset;
diff --git a/src/connecter.cpp b/src/connecter.cpp
deleted file mode 100644
index 970dcf7..0000000
--- a/src/connecter.cpp
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "connecter.hpp"
-#include "io_thread.hpp"
-#include "session.hpp"
-#include "err.hpp"
-#include "simple_semaphore.hpp"
-#include "zmq_tcp_engine.hpp"
-
-zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_,
- session_t *session_) :
- io_object_t (thread_),
- state (idle),
- poller (NULL),
- session (session_),
- addr (addr_),
- identity ("abcde"),
- engine (NULL)
-{
-}
-
-void zmq::connecter_t::terminate ()
-{
- delete this;
-}
-
-void zmq::connecter_t::shutdown ()
-{
- delete this;
-}
-
-zmq::connecter_t::~connecter_t ()
-{
-}
-
-void zmq::connecter_t::process_reg (simple_semaphore_t *smph_)
-{
- // Fet poller pointer for further use.
- zmq_assert (!poller);
- poller = get_poller ();
-
- // Ask the session to register itself with the I/O thread. Note that
- // the session is living in the same I/O thread, thus this results
- // in a synchronous call.
- session->inc_seqnum ();
- send_reg (session, NULL);
-
- // Unlock the application thread that created the connecter.
- if (smph_)
- smph_->post ();
-
- // Manually trigger timer event which will launch asynchronous connect.
- state = waiting;
- timer_event ();
-}
-
-void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_)
-{
- // Unregister connecter/engine from the poller.
- zmq_assert (poller);
- if (state == connecting)
- poller->rm_fd (handle);
- else if (state == waiting)
- poller->cancel_timer (this);
- else if (state == sending)
- engine->terminate ();
-
- // Unlock the application thread closing the connecter.
- if (smph_)
- smph_->post ();
-}
-
-void zmq::connecter_t::in_event ()
-{
- // Error occured in asynchronous connect. Retry to connect after a while.
- if (state == connecting) {
- fd_t fd = tcp_connecter.connect ();
- zmq_assert (fd == retired_fd);
- poller->rm_fd (handle);
- poller->add_timer (this);
- state = waiting;
- return;
- }
-
- zmq_assert (false);
-}
-
-void zmq::connecter_t::out_event ()
-{
- if (state == connecting) {
-
- fd_t fd = tcp_connecter.connect ();
- if (fd == retired_fd) {
- poller->rm_fd (handle);
- poller->add_timer (this);
- state = waiting;
- return;
- }
-
- poller->rm_fd (handle);
- engine = new zmq_tcp_engine_t (fd);
- zmq_assert (engine);
- engine->attach (poller, this);
- state = sending;
- return;
- }
-
- zmq_assert (false);
-}
-
-void zmq::connecter_t::timer_event ()
-{
- zmq_assert (state == waiting);
-
- // Initiate async connect and start polling for its completion. If async
- // connect fails instantly, try to reconnect after a while.
- int rc = tcp_connecter.open (addr.c_str ());
- if (rc == 0) {
- state = connecting;
- in_event ();
- }
- else if (rc == 1) {
- handle = poller->add_fd (tcp_connecter.get_fd (), this);
- poller->set_pollout (handle);
- state = connecting;
- }
- else {
- poller->add_timer (this);
- state = waiting;
- }
-}
-
-void zmq::connecter_t::set_engine (struct i_engine *engine_)
-{
- engine = engine_;
-}
-
-bool zmq::connecter_t::read (zmq_msg *msg_)
-{
- zmq_assert (state == sending);
-
- // Deallocate old content of the message just in case.
- zmq_msg_close (msg_);
-
- // Send the identity.
- zmq_msg_init_size (msg_, identity.size ());
- memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ());
-
- // Ask engine to unregister from the poller.
- i_engine *e = engine;
- engine->detach ();
-
- // Attach the engine to the session. (Note that this is actually
- // a synchronous call.
- session->inc_seqnum ();
- send_engine (session, e);
-
- state = idle;
-
- return true;
-}
-
-bool zmq::connecter_t::write (struct zmq_msg *msg_)
-{
- // No incoming messages are accepted till identity is sent.
- return false;
-}
-
-void zmq::connecter_t::flush ()
-{
- // No incoming messages are accepted till identity is sent.
-}
diff --git a/src/connecter.hpp b/src/connecter.hpp
deleted file mode 100644
index 1f11c63..0000000
--- a/src/connecter.hpp
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__
-#define __ZMQ_CONNECTER_HPP_INCLUDED__
-
-#include <string>
-
-#include "../include/zmq.h"
-
-#include "i_poller.hpp"
-#include "io_object.hpp"
-#include "i_poll_events.hpp"
-#include "i_session.hpp"
-#include "tcp_connecter.hpp"
-
-namespace zmq
-{
-
- class connecter_t : public io_object_t, public i_poll_events,
- public i_session
- {
- public:
-
- connecter_t (class io_thread_t *thread_, const char *addr_,
- class session_t *session_);
-
- void terminate ();
- void shutdown ();
-
- void process_reg (class simple_semaphore_t *smph_);
- void process_unreg (class simple_semaphore_t *smph_);
-
- // i_poll_events implementation.
- void in_event ();
- void out_event ();
- void timer_event ();
-
- // i_session implementation
- void set_engine (struct i_engine *engine_);
- // void shutdown ();
- bool read (struct zmq_msg *msg_);
- bool write (struct zmq_msg *msg_);
- void flush ();
-
- private:
-
- // Clean-up.
- ~connecter_t ();
-
- enum {
- idle,
- waiting,
- connecting,
- sending
- } state;