summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am43
-rw-r--r--src/app_thread.cpp195
-rw-r--r--src/app_thread.hpp88
-rw-r--r--src/array.hpp (renamed from src/yarray.hpp)66
-rw-r--r--src/command.hpp17
-rw-r--r--src/config.hpp9
-rw-r--r--src/connect_session.cpp111
-rw-r--r--src/connect_session.hpp60
-rw-r--r--src/ctx.cpp285
-rw-r--r--src/ctx.hpp109
-rw-r--r--src/decoder.cpp (renamed from src/zmq_decoder.cpp)30
-rw-r--r--src/decoder.hpp47
-rw-r--r--src/encoder.cpp (renamed from src/zmq_encoder.cpp)24
-rw-r--r--src/encoder.hpp39
-rw-r--r--src/fq.cpp56
-rw-r--r--src/fq.hpp26
-rw-r--r--src/i_engine.hpp17
-rw-r--r--src/i_inout.hpp21
-rw-r--r--src/i_terminate_events.hpp (renamed from src/zmq_encoder.hpp)35
-rw-r--r--src/io_object.cpp22
-rw-r--r--src/io_object.hpp10
-rw-r--r--src/io_thread.cpp5
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/lb.cpp36
-rw-r--r--src/lb.hpp24
-rw-r--r--src/named_session.cpp88
-rw-r--r--src/named_session.hpp (renamed from src/zmq_decoder.hpp)41
-rw-r--r--src/object.cpp118
-rw-r--r--src/object.hpp48
-rw-r--r--src/own.cpp203
-rw-r--r--src/own.hpp135
-rw-r--r--src/owned.cpp71
-rw-r--r--src/owned.hpp89
-rw-r--r--src/pair.cpp87
-rw-r--r--src/pair.hpp29
-rw-r--r--src/pgm_receiver.cpp14
-rw-r--r--src/pgm_receiver.hpp13
-rw-r--r--src/pgm_sender.cpp12
-rw-r--r--src/pgm_sender.hpp11
-rw-r--r--src/pipe.cpp407
-rw-r--r--src/pipe.hpp147
-rw-r--r--src/pub.cpp77
-rw-r--r--src/pub.hpp29
-rw-r--r--src/pull.cpp50
-rw-r--r--src/pull.hpp21
-rw-r--r--src/push.cpp52
-rw-r--r--src/push.hpp21
-rw-r--r--src/rep.cpp238
-rw-r--r--src/rep.hpp43
-rw-r--r--src/req.cpp234
-rw-r--r--src/req.hpp48
-rw-r--r--src/semaphore.hpp186
-rw-r--r--src/session.cpp232
-rw-r--r--src/session.hpp93
-rw-r--r--src/socket_base.cpp749
-rw-r--r--src/socket_base.hpp171
-rw-r--r--src/sub.cpp42
-rw-r--r--src/sub.hpp22
-rw-r--r--src/swap.cpp (renamed from src/msg_store.cpp)33
-rw-r--r--src/swap.hpp (renamed from src/msg_store.hpp)28
-rw-r--r--src/thread.cpp20
-rw-r--r--src/thread.hpp9
-rw-r--r--src/transient_session.cpp36
-rw-r--r--src/transient_session.hpp (renamed from src/i_endpoint.hpp)30
-rw-r--r--src/trie.cpp (renamed from src/prefix_tree.cpp)36
-rw-r--r--src/trie.hpp (renamed from src/prefix_tree.hpp)17
-rw-r--r--src/xrep.cpp84
-rw-r--r--src/xrep.hpp30
-rw-r--r--src/xreq.cpp40
-rw-r--r--src/xreq.hpp19
-rw-r--r--src/yarray_item.hpp64
-rw-r--r--src/zmq.cpp368
-rw-r--r--src/zmq_connecter.cpp50
-rw-r--r--src/zmq_connecter.hpp25
-rw-r--r--src/zmq_engine.cpp55
-rw-r--r--src/zmq_engine.hpp23
-rw-r--r--src/zmq_init.cpp140
-rw-r--r--src/zmq_init.hpp33
-rw-r--r--src/zmq_listener.cpp24
-rw-r--r--src/zmq_listener.hpp13
80 files changed, 3220 insertions, 3155 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 19a80d0..86a6fbd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -49,16 +49,17 @@ endif
nodist_libzmq_la_SOURCES = $(pgm_sources)
-libzmq_la_SOURCES = app_thread.hpp \
+libzmq_la_SOURCES = \
+ array.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
blob.hpp \
command.hpp \
config.hpp \
+ connect_session.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
- push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -69,18 +70,18 @@ libzmq_la_SOURCES = app_thread.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
- i_endpoint.hpp \
i_engine.hpp \
i_poll_events.hpp \
+ i_terminate_events.hpp \
kqueue.hpp \
lb.hpp \
likely.hpp \
msg_content.hpp \
- msg_store.hpp \
mutex.hpp \
+ named_session.hpp \
object.hpp \
options.hpp \
- owned.hpp \
+ own.hpp \
pgm_receiver.hpp \
pgm_sender.hpp \
pgm_socket.hpp \
@@ -89,43 +90,44 @@ libzmq_la_SOURCES = app_thread.hpp \
poll.hpp \
poller.hpp \
pair.hpp \
- prefix_tree.hpp \
pub.hpp \
+ pull.hpp \
+ push.hpp \
queue.hpp \
rep.hpp \
req.hpp \
select.hpp \
+ semaphore.hpp \
session.hpp \
signaler.hpp \
socket_base.hpp \
stdint.hpp \
streamer.hpp \
sub.hpp \
+ swap.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
- pull.hpp \
+ transient_session.hpp \
+ trie.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
xrep.hpp \
xreq.hpp \
- yarray.hpp \
- yarray_item.hpp \
ypipe.hpp \
yqueue.hpp \
zmq_connecter.hpp \
- zmq_decoder.hpp \
- zmq_encoder.hpp \
zmq_engine.hpp \
zmq_init.hpp \
zmq_listener.hpp \
- app_thread.cpp \
command.cpp \
ctx.cpp \
+ connect_session.cpp \
+ decoder.cpp \
devpoll.cpp \
- push.cpp \
+ encoder.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
@@ -135,17 +137,18 @@ libzmq_la_SOURCES = app_thread.hpp \
ip.cpp \
kqueue.cpp \
lb.cpp \
- msg_store.cpp \
+ named_session.cpp \
object.cpp \
options.cpp \
- owned.cpp \
+ own.cpp \
+ pair.cpp \
pgm_receiver.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
- pair.cpp \
- prefix_tree.cpp \
pipe.cpp \
poll.cpp \
+ pull.cpp \
+ push.cpp \
pub.cpp \
queue.cpp \
rep.cpp \
@@ -156,18 +159,18 @@ libzmq_la_SOURCES = app_thread.hpp \
socket_base.cpp \
streamer.cpp \
sub.cpp \
+ swap.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
- pull.cpp \
+ transient_session.cpp \
+ trie.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
zmq.cpp \
zmq_connecter.cpp \
- zmq_decoder.cpp \
- zmq_encoder.cpp \
zmq_engine.cpp \
zmq_init.cpp \
zmq_listener.cpp
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
deleted file mode 100644
index ac59464..0000000
--- a/src/app_thread.cpp
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <new>
-#include <algorithm>
-
-#include "../include/zmq.h"
-
-#include "platform.hpp"
-
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#if defined _MSC_VER
-#include <intrin.h>
-#endif
-#else
-#include <unistd.h>
-#endif
-
-#include "app_thread.hpp"
-#include "ctx.hpp"
-#include "err.hpp"
-#include "pipe.hpp"
-#include "config.hpp"
-#include "socket_base.hpp"
-#include "pair.hpp"
-#include "pub.hpp"
-#include "sub.hpp"
-#include "req.hpp"
-#include "rep.hpp"
-#include "xreq.hpp"
-#include "xrep.hpp"
-#include "pull.hpp"
-#include "push.hpp"
-
-// If the RDTSC is available we use it to prevent excessive
-// polling for commands. The nice thing here is that it will work on any
-// system with x86 architecture and gcc or MSVC compiler.
-#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\
- (defined _MSC_VER && (defined _M_IX86 || defined _M_X64))
-#define ZMQ_DELAY_COMMANDS
-#endif
-
-zmq::app_thread_t::app_thread_t (ctx_t *ctx_,
- uint32_t thread_slot_) :
- object_t (ctx_, thread_slot_),
- last_processing_time (0),
- terminated (false)
-{
-}
-
-zmq::app_thread_t::~app_thread_t ()
-{
- zmq_assert (sockets.empty ());
-}
-
-void zmq::app_thread_t::stop ()
-{
- send_stop ();
-}
-
-zmq::signaler_t *zmq::app_thread_t::get_signaler ()
-{
- return &signaler;
-}
-
-bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
-{
- bool received;
- command_t cmd;
- if (block_) {
- received = signaler.recv (&cmd, true);
- zmq_assert (received);
- }
- else {
-
-#if defined ZMQ_DELAY_COMMANDS
- // Optimised version of command processing - it doesn't have to check
- // for incoming commands each time. It does so only if certain time
- // elapsed since last command processing. Command delay varies
- // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
- // etc. The optimisation makes sense only on platforms where getting
- // a timestamp is a very cheap operation (tens of nanoseconds).
- if (throttle_) {
-
- // Get timestamp counter.
-#if defined __GNUC__
- uint32_t low;
- uint32_t high;
- __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
- uint64_t current_time = (uint64_t) high << 32 | low;
-#elif defined _MSC_VER
- uint64_t current_time = __rdtsc ();
-#else
-#error
-#endif
-
- // Check whether certain time have elapsed since last command
- // processing.
- if (current_time - last_processing_time <= max_command_delay)
- return !terminated;
- last_processing_time = current_time;
- }
-#endif
-
- // Check whether there are any commands pending for this thread.
- received = signaler.recv (&cmd, false);
- }
-
- // Process all the commands available at the moment.
- while (received) {
- cmd.destination->process_command (cmd);
- received = signaler.recv (&cmd, false);
- }
-
- return !terminated;
-}
-
-zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
-{
- socket_base_t *s = NULL;
- switch (type_) {
- case ZMQ_PAIR:
- s = new (std::nothrow) pair_t (this);
- break;
- case ZMQ_PUB:
- s = new (std::nothrow) pub_t (this);
- break;
- case ZMQ_SUB:
- s = new (std::nothrow) sub_t (this);
- break;
- case ZMQ_REQ:
- s = new (std::nothrow) req_t (this);
- break;
- case ZMQ_REP:
- s = new (std::nothrow) rep_t (this);
- break;
- case ZMQ_XREQ:
- s = new (std::nothrow) xreq_t (this);
- break;
- case ZMQ_XREP:
- s = new (std::nothrow) xrep_t (this);
- break;
- case ZMQ_PULL:
- s = new (std::nothrow) pull_t (this);
- break;
- case ZMQ_PUSH:
- s = new (std::nothrow) push_t (this);
- break;
- default:
- if (sockets.empty ())
- get_ctx ()->no_sockets (this);
- errno = EINVAL;
- return NULL;
- }
- zmq_assert (s);
-
- sockets.push_back (s);
-
- return s;
-}
-
-void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
-{
- sockets.erase (socket_);
- if (sockets.empty ())
- get_ctx ()->no_sockets (this);
-}
-
-void zmq::app_thread_t::process_stop ()
-{
- terminated = true;
-}
-
-bool zmq::app_thread_t::is_terminated ()
-{
- return terminated;
-}
-
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
deleted file mode 100644
index f0deaab..0000000
--- a/src/app_thread.hpp
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__
-#define __ZMQ_APP_THREAD_HPP_INCLUDED__
-
-#include <vector>
-
-#include "stdint.hpp"
-#include "object.hpp"
-#include "yarray.hpp"
-#include "signaler.hpp"
-
-namespace zmq
-{
-
- class app_thread_t : public object_t
- {
- public:
-
- app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
-
- ~app_thread_t ();
-
- // Interrupt blocking call if the app thread is stuck in one.
- // This function is is called from a different thread!
- void stop ();
-
- // Returns signaler associated with this application thread.
- signaler_t *get_signaler ();
-
- // Processes commands sent to this thread (if any). If 'block' is
- // set to true, returns only after at least one command was processed.
- // If throttle argument is true, commands are processed at most once
- // in a predefined time period. The function returns false is the
- // associated context was terminated, true otherwise.
- bool process_commands (bool block_, bool throttle_);
-
- // Create a socket of a specified type.
- class socket_base_t *create_socket (int type_);
-
- // Unregister the socket from the app_thread (called by socket itself).
- void remove_socket (class socket_base_t *socket_);
-
- // Returns true is the associated context was already terminated.
- bool is_terminated ();
-
- private:
-
- // Command handlers.
- void process_stop ();
-
- // All the sockets created from this application thread.
- typedef yarray_t <socket_base_t> sockets_t;
- sockets_t sockets;
-
- // App thread's signaler object.
- signaler_t signaler;
-
- // Timestamp of when commands were processed the last time.
- uint64_t last_processing_time;
-
- // If true, 'stop' command was already received.
- bool terminated;
-
- app_thread_t (const app_thread_t&);
- void operator = (const app_thread_t&);
- };
-
-}
-
-#endif
diff --git a/src/yarray.hpp b/src/array.hpp
index 8c79b99..a144049 100644
--- a/src/yarray.hpp
+++ b/src/array.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_YARRAY_INCLUDED__
-#define __ZMQ_YARRAY_INCLUDED__
+#ifndef __ZMQ_ARRAY_INCLUDED__
+#define __ZMQ_ARRAY_INCLUDED__
#include <vector>
#include <algorithm>
@@ -26,21 +26,57 @@
namespace zmq
{
+ // Base class for objects stored in the array. Note that each object can
+ // be stored in at most one array.
+
+ class array_item_t
+ {
+ public:
+
+ inline array_item_t () :
+ array_index (-1)
+ {
+ }
+
+ // The destructor doesn't have to be virtual. It is mad virtual
+ // just to keep ICC and code checking tools from complaining.
+ inline virtual ~array_item_t ()
+ {
+ }
+
+ inline void set_array_index (int index_)
+ {
+ array_index = index_;
+ }
+
+ inline int get_array_index ()
+ {
+ return array_index;
+ }
+
+ private:
+
+ int arr