summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-12 09:40:16 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-12 09:40:16 +0200
commit059beca59d39d90a8ee0e1b07f840994962ea89e (patch)
tree007a5d86450c543bb9a362a844ee271115b68c54 /src
parentbda766ab401b6c565fe9c2d0bc80c11bbbe84488 (diff)
listener/connecter/init/session added
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am13
-rw-r--r--src/atomic.hpp310
-rw-r--r--src/i_inout.hpp37
-rw-r--r--src/io_object.cpp47
-rw-r--r--src/io_object.hpp50
-rw-r--r--src/options.cpp29
-rw-r--r--src/options.hpp42
-rw-r--r--src/owned.cpp74
-rw-r--r--src/owned.hpp82
-rw-r--r--src/session.cpp58
-rw-r--r--src/session.hpp57
-rw-r--r--src/socket_base.cpp29
-rw-r--r--src/socket_base.hpp8
-rw-r--r--src/zmq_connecter.cpp36
-rw-r--r--src/zmq_connecter.hpp17
-rw-r--r--src/zmq_decoder.cpp78
-rw-r--r--src/zmq_decoder.hpp57
-rw-r--r--src/zmq_encoder.cpp76
-rw-r--r--src/zmq_encoder.hpp55
-rw-r--r--src/zmq_engine.cpp107
-rw-r--r--src/zmq_engine.hpp33
-rw-r--r--src/zmq_init.cpp110
-rw-r--r--src/zmq_init.hpp82
-rw-r--r--src/zmq_listener.cpp25
-rw-r--r--src/zmq_listener.hpp11
25 files changed, 1069 insertions, 454 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 8524067..9d6127c 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -15,6 +15,7 @@ libzmq_la_SOURCES = \
err.hpp \
fd.hpp \
fd_signaler.hpp \
+ i_inout.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
@@ -25,10 +26,13 @@ libzmq_la_SOURCES = \
msg.hpp \
mutex.hpp \
object.hpp \
+ options.hpp \
+ owner.hpp \
pipe.hpp \
platform.hpp \
poll.hpp \
select.hpp \
+ session.hpp \
simple_semaphore.hpp \
socket_base.hpp \
stdint.hpp \
@@ -43,7 +47,10 @@ libzmq_la_SOURCES = \
ypollset.hpp \
yqueue.hpp \
zmq_connecter.hpp \
+ zmq_decoder.hpp \
+ zmq_encoder.hpp \
zmq_engine.hpp \
+ zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
devpoll.cpp \
@@ -56,8 +63,11 @@ libzmq_la_SOURCES = \
ip.cpp \
kqueue.cpp \
object.cpp \
+ options.cpp \
+ owned.cpp \
poll.cpp \
select.cpp \
+ session.cpp \
socket_base.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
@@ -67,7 +77,10 @@ libzmq_la_SOURCES = \
ypollset.cpp \
zmq.cpp \
zmq_connecter.cpp \
+ zmq_decoder.cpp \
+ zmq_encoder.cpp \
zmq_engine.cpp \
+ zmq_init.cpp \
zmq_listener.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
diff --git a/src/atomic.hpp b/src/atomic.hpp
deleted file mode 100644
index e581593..0000000
--- a/src/atomic.hpp
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ATOMIC_HPP_INCLUDED__
-#define __ZMQ_ATOMIC_HPP_INCLUDED__
-
-#include "stdint.hpp"
-
-#if defined ZMQ_FORCE_MUTEXES
-#define ZMQ_ATOMIC_MUTEX
-#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
-#define ZMQ_ATOMIC_X86
-#elif defined ZMQ_HAVE_WINDOWS
-#define ZMQ_ATOMIC_WINDOWS
-#elif defined ZMQ_HAVE_SOLARIS
-#define ZMQ_ATOMIC_SOLARIS
-#else
-#define ZMQ_ATOMIC_MUTEX
-#endif
-
-namespace zmq
-{
-
- // Atomic assignement.
- inline void atomic_uint32_set (volatile uint32_t *p_, uint32_t value_)
- {
- *p_ = value_;
- // StoreLoad memory barrier should go here on platforms with
- // memory models that require it.
- }
-
- // Atomic retrieval of an integer.
- inline uint32_t atomic_uint32_get (volatile uint32_t *p_)
- {
- // StoreLoad memory barrier should go here on platforms with
- // memory models that require it.
- return *p_;
- }
-
- // Atomic addition. Returns the old value.
- inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- return InterlockedExchangeAdd ((LONG*) &value, increment_);
-#elif defined ZMQ_ATOMIC_SOLARIS
- return atomic_add_32_nv (&value, increment_) - delta_;
-#elif defined ZMQ_ATOMIC_X86
- uint32_t old;
- __asm__ volatile (
- "lock; xadd %0, %1\n\t"
- : "=r" (old), "=m" (*p_)
- : "0" (delta_), "m" (*p_)
- : "cc", "memory");
- return old;
-#else
-#error // TODO:
- sync.lock ();
- uint32_t old = *p_;
- *p_ += delta_;
- sync.unlock ();
-#endif
- }
-
- // Atomic subtraction. Returns the old value.
- inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- LONG delta = - ((LONG) delta_);
- return InterlockedExchangeAdd ((LONG*) &value, delta);
-#elif defined ZMQ_ATOMIC_SOLARIS
- int32_t delta = - ((int32_t) delta_);
- return atomic_add_32_nv (&value, delta) + delta_;
-#elif defined ZMQ_ATOMIC_X86
- uint32_t old = -delta_;
- __asm__ volatile ("lock; xaddl %0,%1"
- : "=r" (old), "=m" (*p_)
- : "0" (old), "m" (*p_)
- : "cc");
- return old;
-#else
-#error // TODO:
- sync.lock ();
- uint32_t old = *p_;
- *p_ -= delta_;
- sync.unlock ();
- return old;
-#endif
- }
-
- // Atomic assignement.
- template <typename T>
- inline void atomic_ptr_set (volatile T **p_, T *value_)
- {
- *p_ = value_;
- // StoreLoad memory barrier should go here on platforms with
- // memory models that require it.
- }
-
- // Perform atomic 'exchange pointers' operation. Old value is returned.
- template <typename T>
- inline void *atomic_ptr_xchg (volatile T **p_, T *value_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- return InterlockedExchangePointer (p_, value_);
-#elif defined ZMQ_ATOMIC_SOLARIS
- return atomic_swap_ptr (p_, value_);
-#elif defined ZMQ_ATOMIC_X86
- void *old;
- __asm__ volatile (
- "lock; xchg %0, %2"
- : "=r" (old), "=m" (*p_)
- : "m" (*p_), "0" (value_));
- return old;
-#else
-#error // TODO:
- sync.lock ();
- void *old = *p_;
- *p_ = value_;
- sync.unlock ();
- return old;
-#endif
- }
-
- // Perform atomic 'compare and swap' operation on the pointer.
- // The pointer is compared to 'cmp' argument and if they are
- // equal, its value is set to 'value'. Old value of the pointer
- // is returned.
- template <typename T>
- inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- return InterlockedCompareExchangePointer (p_, value_, cmp_);
-#elif defined ZMQ_ATOMIC_SOLARIS
- return atomic_cas_ptr (p_, cmp_, value_);
-#elif defined ZMQ_ATOMIC_X86
- void *old;
- __asm__ volatile (
- "lock; cmpxchg %2, %3"
- : "=a" (old), "=m" (*p_)
- : "r" (value_), "m" (*p_), "0" (cmp_)
- : "cc");
- return old;
-#else
-#error // TODO:
- sync.lock ();
- void *old = *p_;
- if (old == cmp_)
- *p_ = value_;
- sync.unlock ();
- return old;
-#endif
- }
-
-#if defined ZMQ_ATOMIC_X86 && defined __x86_64__
- typedef uint64_t atomic_bitmap_t;
-#else
- typedef uint32_t atomic_bitmap_t;
-#endif
-
- // Atomic assignement.
- inline void atomic_bitmap_set (volatile atomic_bitmap_t *p_,
- atomic_bitmap_t value_)
- {
- *p_ = value_;
- // StoreLoad memory barrier should go here on platforms with
- // memory models that require it.
- }
-
- // Bit-test-set-and-reset. Sets one bit of the value and resets
- // another one. Returns the original value of the reset bit.
- inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_,
- int set_index_, int reset_index_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- while (true) {
- atomic_bitmap_t oldval = *p_;
- atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) <<
- set_index_)) & ~(integer_t (1) << reset_index_);
- if (InterlockedCompareExchange ((volatile LONG*) p_, newval,
- oldval) == (LONG) oldval)
- return (oldval & (atomic_bitmap_t (1) << reset_index_)) ?
- true : false;
- }
-#elif defined ZMQ_ATOMIC_SOLARIS
- while (true) {
- atomic_bitmap_t oldval = *p_;
- atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) <<
- set_index_)) & ~(integer_t (1) << reset_index_);
- if (atomic_cas_32 (p_, oldval, newval) == oldval)
- return (oldval & (atomic_bitmap_t (1) << reset_index_)) ?
- true : false;
- }
-#elif defined ZMQ_ATOMIC_X86
- atomic_bitmap_t oldval, dummy;
- __asm__ volatile (
- "mov %0, %1\n\t"
- "1:"
- "mov %1, %2\n\t"
- "bts %3, %2\n\t"
- "btr %4, %2\n\t"
- "lock cmpxchg %2, %0\n\t"
- "jnz 1b\n\t"
- : "+m" (*p_), "=&a" (oldval), "=&r" (dummy)
- : "r" (atomic_bitmap_t (set_index_)),
- "r" (atomic_bitmap_t (reset_index_))
- : "cc");
- return (bool) (oldval & (atomic_bitmap_t (1) << reset_index_));
-#else
-#error // TODO:
- sync.lock ();
- atomic_bitmap_t oldval = *p_;
- *p_ = (oldval | (atomic_bitmap_t (1) << set_index_)) &
- ~(atomic_bitmap_t (1) << reset_index_);
- sync.unlock ();
- return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false;
-#endif
- }
-
- // Sets value to newval. Returns the original value.
- inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_,
- atomic_bitmap_t newval_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- return InterlockedExchange ((volatile LONG*) p_, newval_);
-#elif defined ZMQ_ATOMIC_SOLARIS
- return atomic_swap_32 (p_, newval_);
-#elif defined ZMQ_ATOMIC_X86
- atomic_bitmap_t oldval = newval_;
- __asm__ volatile (
- "lock; xchg %0, %1"
- : "=r" (oldval)
- : "m" (*p_), "0" (oldval)
- : "memory");
- return oldval;
-#else
-#error // TODO:
- sync.lock ();
- atomic_bitmap_t oldval = *p_;
- *p_ = newval_;
- sync.unlock ();
-#endif
- }
-
- // izte is "if-zero-then-else" atomic operation - if the value is zero
- // it substitutes it by 'thenval' else it rewrites it by 'elseval'.
- // Original value of the integer is returned from this function.
- inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_,
- atomic_bitmap_t thenval_, atomic_bitmap_t elseval_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- while (true) {
- atomic_bitmap_t oldval = *p_;
- atomic_bitmap_t newval = (oldval ? elseval_ : thenval_);
- if (InterlockedCompareExchange ((volatile LONG*) p_, newval,
- oldval) == (LONG) oldval)
- return oldval;
- }
-#elif defined ZMQ_ATOMIC_SOLARIS
- while (true) {
- atomic_bitmap_t oldval = *p_;
- atomic_bitmap_t newval = (oldval ? elseval_ : thenval_);
- if (atomic_cas_32 (p_, oldval, newval) == oldval)
- return oldval;
- }
-#elif defined ZMQ_ATOMIC_X86
- atomic_bitmap_t oldval;
- atomic_bitmap_t dummy;
- __asm__ volatile (
- "mov %0, %1\n\t"
- "1:"
- "mov %3, %2\n\t"
- "test %1, %1\n\t"
- "jz 2f\n\t"
- "mov %4, %2\n\t"
- "2:"
- "lock cmpxchg %2, %0\n\t"
- "jnz 1b\n\t"
- : "+m" (*p_), "=&a" (oldval), "=&r" (dummy)
- : "r" (thenval_), "r" (elseval_)
- : "cc");
- return oldval;
-#else
-#error // TODO:
- sync.lock ();
- atomic_bitmap_t oldval = *p_;
- *p_ = oldval ? elseval_ : thenval_;
- sync.unlock ();
- return oldval;
-#endif
- }
-
-}
-
-#endif
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
new file mode 100644
index 0000000..be2e007
--- /dev/null
+++ b/src/i_inout.hpp
@@ -0,0 +1,37 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
+#define __ZMQ_I_INOUT_HPP_INCLUDED__
+
+#include "../include/zmq.h"
+
+namespace zmq
+{
+
+ struct i_inout
+ {
+ virtual bool read (::zmq_msg *msg_) = 0;
+ virtual bool write (::zmq_msg *msg_) = 0;
+ virtual void flush () = 0;
+ };
+
+}
+
+#endif
diff --git a/src/io_object.cpp b/src/io_object.cpp
index d8cc1c0..f61e5f0 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -21,35 +21,19 @@
#include "io_thread.hpp"
#include "err.hpp"
-zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) :
- object_t (parent_),
- owner (owner_),
- plugged_in (false),
- terminated (false)
+zmq::io_object_t::io_object_t (io_thread_t *io_thread_)
{
// Retrieve the poller from the thread we are running in.
- poller = parent_->get_poller ();
+ poller = io_thread_->get_poller ();
}
zmq::io_object_t::~io_object_t ()
{
}
-void zmq::io_object_t::process_plug ()
+void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)
{
- zmq_assert (!plugged_in);
-
- // If termination of the object was already requested, destroy it and
- // send the termination acknowledgement.
- if (terminated) {
- send_term_ack (owner);
- delete this;
- return;
- }
-
- // Notify the generic termination mechanism (io_object_t) that the object
- // is already plugged in.
- plugged_in = true;
+ poller = io_thread_->get_poller ();
}
zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_)
@@ -106,26 +90,3 @@ void zmq::io_object_t::timer_event ()
{
zmq_assert (false);
}
-
-void zmq::io_object_t::term ()
-{
- send_term_req (owner, this);
-}
-
-void zmq::io_object_t::process_term ()
-{
- zmq_assert (!terminated);
-
- // If termination request has occured even before the object was plugged in
- // wait till plugging in happens, then acknowledge the termination.
- if (!plugged_in) {
- terminated = true;
- return;
- }
-
- // Otherwise, destroy the object and acknowledge the termination
- // straight away.
- send_term_ack (owner);
- process_unplug ();
- delete this;
-}
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 4f323ad..e5582db 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -20,41 +20,31 @@
#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__
#define __ZMQ_IO_OBJECT_HPP_INCLUDED__
-#include "object.hpp"
+#include <stddef.h>
+
#include "i_poller.hpp"
#include "i_poll_events.hpp"
namespace zmq
{
- class io_object_t : public object_t, public i_poll_events
+ // Simple base class for objects that live in I/O threads.
+ // It makes communication with the poller object easier and
+ // makes defining unneeded event handlers unnecessary.
+
+ class io_object_t : public i_poll_events
{
public:
- // I/O object will live in the thread inherited from the parent.
- // However, it's lifetime is managed by the owner.
- io_object_t (class io_thread_t *parent_, object_t *owner_);
+ io_object_t (class io_thread_t *io_thread_ = NULL);
+ ~io_object_t ();
protected:
- // Ask owner socket to terminate this I/O object. This may not happen
- void term ();
-
- // I/O object destroys itself. No point in allowing others to invoke
- // the destructor. At the same time, it has to be virtual so that
- // generic io_object deallocation mechanism destroys specific type
- // of I/O object correctly.
- virtual ~io_object_t ();
-
- // Handlers for incoming commands. It vital that every I/O object
- // invokes io_object_t::process_plug at the end of it's own plug
- // handler.
- void process_plug ();
-
- // io_object_t defines a new handler used to disconnect the object
- // from the poller object. Implement the handlen in the derived
- // classes to ensure sane cleanup.
- virtual void process_unplug () = 0;
+ // Derived class can init/swap the underlying I/O thread.
+ // Caution: Remove all the file descriptors from the old I/O thread
+ // before swapping to the new one!
+ void set_io_thread (class io_thread_t *io_thread_);
// Methods to access underlying poller object.
handle_t add_fd (fd_t fd_);
@@ -71,24 +61,10 @@ namespace zmq
void out_event ();
void timer_event ();
- // Socket owning this I/O object. It is responsible for destroying
- // it when it's being closed.
- object_t *owner;
-
private:
- // Set to true when object is plugged in.
- bool plugged_in;
-
- // Set to true when object was terminated before it was plugged in.
- // In such case destruction is delayed till 'plug' command arrives.
- bool terminated;
-
struct i_poller *poller;
- // Handlers for incoming commands.
- void process_term ();
-
io_object_t (const io_object_t&);
void operator = (const io_object_t&);
};
diff --git a/src/options.cpp b/src/options.cpp
new file mode 100644
index 0000000..cd07c44
--- /dev/null
+++ b/src/options.cpp
@@ -0,0 +1,29 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "options.hpp"
+
+zmq::options_t::options_t () :
+ hwm (0),
+ lwm (0),
+ swap (0),
+ mask (0),
+ affinity (0)
+{
+}
diff --git a/src/options.hpp b/src/options.hpp
new file mode 100644
index 0000000..7d78da2
--- /dev/null
+++ b/src/options.hpp
@@ -0,0 +1,42 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
+#define __ZMQ_OPTIONS_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ struct options_t
+ {
+ options_t ();
+
+ int64_t hwm;
+ int64_t lwm;
+ int64_t swap;
+ uint64_t mask;
+ uint64_t affinity;
+ std::string identity;
+ };
+
+}
+
+#endif
diff --git a/src/owned.cpp b/src/owned.cpp
new file mode 100644
index 0000000..22e257f
--- /dev/null
+++ b/src/owned.cpp
@@ -0,0 +1,74 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "owned.hpp"
+#include "err.hpp"
+
+zmq::owned_t::owned_t (object_t *parent_, object_t *owner_) :
+ object_t (parent_),
+ owner (owner_),
+ plugged_in (false),
+ terminated (false)
+{
+}
+
+zmq::owned_t::~owned_t ()
+{
+}
+
+void zmq::owned_t::process_plug ()
+{
+ zmq_assert (!plugged_in);
+
+ // If termination of the object was already requested, destroy it and
+ // send the termination acknowledgement.
+ if (terminated) {
+ send_term_ack (owner);
+ delete this;
+ return;
+ }
+
+ // Notify the generic termination mechanism (io_object_t) that the object
+ // is already plugged in.
+ plugged_in = true;
+}
+
+void zmq::owned_t::term ()
+{
+ send_term_req (owner, this);
+}
+
+void zmq::owned_t::process_term ()
+{
+ zmq_assert (!terminated);
+
+ // If termination request has occured even before the object was plugged in
+ // wait till plugging in happens, then acknowledge the termination.
+ if (!plugged_in) {
+ terminated = true;
+ return;
+ }
+
+ // Otherwise, destroy the object and acknowledge the termination
+ // straight away.
+ send_term_ack (owner);
+ process_unplug ();
+ delete this;
+}
+
diff --git a/src/owned.hpp b/src/owned.hpp
new file mode 100644
index 0000000..164622e
--- /dev/null
+++ b/src/owned.hpp
@@ -0,0 +1,82 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_OWNED_HPP_INCLUDED__
+#define __ZMQ_OWNED_HPP_INCLUDED__
+
+#include "object.hpp"
+
+namespace zmq
+{
+
+ // Base class for objects owned by individual sockets. Handles
+ // initialisation and destruction of such objects.
+
+ class owned_t : public object_t
+ {
+ public:
+
+ // The object will live in parent's thread, however, its lifetime
+ // will be managed by its owner socket.
+ owned_t (object_t *parent_, object_t *owner_);
+
+ protected:
+
+ // Ask owner socket to terminate this object.
+ void term ();
+
+ // Derived object destroys owned_t. No point in allowing others to
+ // invoke the destructor. At the same time, it has to be virtual so
+ // that generic owned_t deallocation mechanism destroys specific type
+ // of the owned object correctly.
+ virtual ~owned_t ();
+
+ // Handlers for incoming commands. It's vital that every I/O object
+ // invokes io_object_t::process_plug at the end of it's own plug
+ // handler.
+ void process_plug ();
+
+ // io_object_t defines a new handler used to disconnect the object
+ // from the poller object. Implement the handlen in the derived
+ // classes to ensure sane cleanup.
+ virtual void process_unplug () = 0;
+
+ // Socket owning this object. It is responsible for destroying
+ // it when it's being closed.
+ object_t *owner;
+
+ private:
+
+ // Handlers for incoming commands.
+ void process_term ();
+
+ // Set to true when object is plugged in.
+ bool plugged_in;
+
+ // Set to true when object was terminated before it was plugged in.
+ // In such case destruction is delayed till 'plug' command arrives.
+ bool terminated;
+
+ owned_t (const owned_t&);
+ void operator = (const owned_t&);
+ };
+
+}
+
+#endif
diff --git a/src/session.cpp b/src/session.cpp
new file mode 100644
index 0000000..fa29dd3
--- /dev/null
+++ b/src/session.cpp
@@ -0,0 +1,58 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "session.hpp"
+#include "zmq_engine.hpp"
+#include "err.hpp"
+
+zmq::session_t::session_t (object_t *parent_, object_t *owner_,
+ zmq_engine_t *engine_) :
+ owned_t (parent_, owner_),
+ engine (engine_)
+{
+}
+
+zmq::session_t::~session_t ()
+{
+}
+
+bool zmq::session_t::read (::zmq_msg *msg_)
+{
+ return false;
+}
+
+bool zmq::session_t::write (::zmq_msg *msg_)
+{
+ return false;
+}
+
+void zmq::session_t::flush ()
+{
+}
+
+void zmq::session_t::process_plug ()
+{
+ engine->plug (this);
+ owned_t::process_plug ();
+}
+
+void zmq::session_t::process_unplug ()
+{
+ engine->unplug ();
+}
diff --git a/src/session.hpp b/src/session.hpp
new file mode 100644
index 0000000..4228fd9
--- /dev/null
+++ b/src/session.hpp
@@ -0,0 +1,57 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_SESSION_HPP_INCLUDED__
+#define __ZMQ_SESSION_HPP_INCLUDED__
+
+#include "i_inout.hpp"
+#include "owned.hpp"
+
+namespace zmq
+{
+
+ class session_t : public owned_t, public i_inout
+ {
+ public:
+
+ session_t (object_t *parent_, object_t *owner_,
+ class zmq_engine_t *engine_);
+
+ private:
+
+ ~session_t ();
+
+ // i_inout interface implementation.
+ bool read (::zmq_msg *msg_);
+ bool write (::zmq_msg *msg_);
+ void flush ();
+
+ // Handlers for incoming commands.
+ void process_plug ();
+ void process_unplug ();
+
+ class zmq_engine_t *engine;
+
+ session_t (const session_t&);
+ void operator = (const session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index c179a93..4fadad2 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -32,12 +32,7 @@
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
pending_term_acks (0),
- app_thread (parent_),
- hwm (0),
- lwm (0),
- swap (0),
- mask (0),
- affinity (0)
+ app_thread (parent_)
{
}
@@ -77,7 +72,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- hwm = *((int64_t*) optval_);
+ options.hwm = *((int64_t*) optval_);
return 0;
case ZMQ_LWM:
@@ -85,7 +80,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- lwm = *((int64_t*) optval_);
+ options.lwm = *((int64_t*) optval_);
return 0;
case ZMQ_SWAP:
@@ -93,7 +88,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- swap = *((int64_t*) optval_);
+ options.swap = *((int64_t*) optval_);
return 0;
case ZMQ_MASK:
@@ -101,7 +96,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- mask = (uint64_t) *((int64_t*) optval_);
+ options.mask = (uint64_t) *((int64_t*) optval_);
return 0;
case ZMQ_AFFINITY:
@@ -109,15 +104,15 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- affinity = (uint64_t) *((int64_t*) optval_);
+ options.affinity = (uint64_t) *((int64_t*) optval_);
return 0;
- case ZMQ_SESSIONID:
+ case ZMQ_IDENTITY:
if (optvallen_ != sizeof (const char*)) {
errno = EINVAL;
return -1;
}
- session_id = (const char*) optval_;
+ options.identity = (const char*) optval_;
return 0;
default:
@@ -128,8 +123,8 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
int zmq::socket_base_t::bind (const char *addr_)
{
- zmq_listener_t *listener =
- new zmq_listener_t (choose_io_thread (affinity), this);
+ zmq_listener_t *listener = new zmq_listener_t (
+ choose_io_thread (options.affinity), this, options);
int rc = listener->set_address (addr_);
if (rc != 0)
return -1;
@@ -141,8 +136,8 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
- zmq_connecter_t *connecter =
- new zmq_connecter_t (choose_io_thread (affinity), this);
+ zmq_connecter_t *connecter = new zmq_connecter_t (
+ choose_io_thread (options.affinity), this, options);
int rc = connecter->set_address (addr_);
if (rc != 0)
return -1;
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 2257fbe..4ccac48 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -24,6 +24,7 @@
#include <string>
#include "object.hpp"
+#include "options.hpp"
#include "stdint.hpp"
namespace zmq
@@ -66,12 +67,7 @@ namespace zmq
class app_thread_t *app_thread;
// Socket options.
- int64_t hwm;
- int64_t lwm;
- int64_t swap;
- uint64_t mask;
- uint64_t affinity;
- std::string session_id;
+ options_t options;
socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&);
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 513508d..4416a70 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -18,11 +18,16 @@
*/
#include "zmq_connecter.hpp"
+#include "zmq_init.hpp"
+#include "io_thread.hpp"
#include "err.hpp"
-zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_) :
- io_object_t (parent_, owner_),
- waiting (false)
+zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_,
+ const options_t &options_) :
+ owned_t (parent_, owner_),
+ io_object_t (parent_),
+ handle_valid (false),
+ options (options_)
{
}
@@ -38,12 +43,12 @@ int zmq::zmq_connecter_t::set_address (const char *addr_)
void zmq::zmq_connecter_t::process_plug ()
{
start_connecting ();
- io_object_t::process_plug ();
+ owned_t::process_plug ();
}
void zmq::zmq_connecter_t::process_unplug ()
{
- if (!waiting)
+ if (handle_valid)
rm_fd (handle);
}
@@ -58,30 +63,31 @@ void zmq::zmq_connecter_t::in_event ()
void zmq::zmq_connecter_t::out_event ()
{
fd_t fd = tcp_connecter.connect ();
+ rm_fd (handle);
+ handle_valid = false;
// If there was error during the connecting, close the socket and wait
// for a while before trying to reconnect.
if (fd == retired_fd) {
- rm_fd (handle);
tcp_connecter.close ();
- waiting = true;
add_timer ();
return;
}
- zmq_assert (false);
+ // Create an init object.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, true, options);
+ zmq_assert (init);
+ send_plug (init);
+ send_own (owner, init);
-/*
- object_t *engine = new zmq_engine_t (choose_io_thread (0), owner);
- send_plug (engine);
- send_own (owner, engine);
-*/
+ // Ask owner socket to shut the connecter down.
+ term ();
}
void zmq::zmq_connecter_t::timer_event ()
{
// Reconnect period have elapsed.
- waiting = false;
start_connecting ();
}
@@ -99,12 +105,12 @@ void zmq::zmq_connecter_t::start_connecting ()
// Connection establishment may be dealyed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (tcp_connecter.get_fd ());
+ handle_valid = true;
set_pollout (handle);
return;
}
// If none of the above is true, synchronous error occured.
// Wait for a while and retry.
- waiting = true;
add_timer ();
}
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index d346396..93497cb 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -20,17 +20,21 @@
#ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
+#include "owned.hpp"
#include "io_object.hpp"
#include "tcp_connecter.hpp"
+#include "options.hpp"
+#include "stdint.hpp"
namespace zmq
{
- class zmq_connecter_t : public io_object_t
+ class zmq_connecter_t : public owned_t, public io_object_t
{
public:
- zmq_connecter_t (class io_thread_t *parent_, object_t *owner_);
+ zmq_connecter_t (class io_thread_t *parent_, object_t *owner_,
+ const options_t &options_);
// Set IP address to connect to.
int set_address (const char *addr_);
@@ -57,9 +61,12 @@ namespace zmq
// Handle corresponding to the listening socket.
handle_t handle;
- // True, if we are waiting for a period of time before trying to
- // reconnect.
- bool waiting;
+ // If true file descriptor is registered with the poller and 'handle'
+ // contains valid value.
+ bool handle_valid;
+
+ // Associated socket options.
+ options_t options;
zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&);
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
new file mode 100644
index 0000000..b461234
--- /dev/null
+++ b/src/zmq_decoder.cpp
@@ -0,0 +1,78 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "zmq_decoder.hpp"
+#include "i_inout.hpp"
+#include "wire.hpp"
+
+zmq::zmq_decoder_t::zmq_decoder_t () :
+ destination (NULL)
+{
+ zmq_msg_init (&in_progress);
+
+ // At the beginning, read one byte and go to one_byte_size_ready state.
+ next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
+}
+
+zmq::zmq_decoder_t::~zmq_decoder_t ()
+{
+ zmq_msg_close (&in_progress);
+}
+
+void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
+{
+ destination = destination_;
+}
+
+bool zmq::zmq_decoder_t::one_byte_size_ready ()
+{
+ // First byte of size is read. If it is 0xff read 8-byte size.
+ // Otherwise allocate the buffer for message data and read the
+ // message data into it.
+ if (*tmpbuf == 0xff)
+ next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready);
+ else {
+ zmq_msg_init_size (&in_progress, *tmpbuf);
+ next_step (zmq_msg_data (&in_progress), *tmpbuf,
+ &zmq_decoder_t::message_ready);
+ }
+ return true;
+}
+
+bool zmq::zmq_decoder_t::eight_byte_size_ready ()
+{
+ // 8-byte size is read. Allocate the buffer for message body and
+ // read the message data into it.
+ size_t size = (size_t) get_uint64 (tmpbuf);
+ zmq_msg_init_size (&in_progress, size);
+ next_step (zmq_msg_data (&in_progress), size,
+ &zmq_decoder_t::message_ready);
+ return true;
+}
+
+bool zmq::zmq_decoder_t::message_ready ()
+{
+ // Message is completely read. Push it further and start reading
+ // new message.
+ if (!destination->write (&in_progress))
+ return false;
+
+ next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
+ return true;
+}
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
new file mode 100644
index 0000000..17c28f8
--- /dev/null
+++ b/src/zmq_decoder.hpp
@@ -0,0 +1,57 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
+#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
+
+#include "../include/zmq.h"
+
+#include "decoder.hpp"
+
+namespace zmq
+{
+ // Decoder for 0MQ backend protocol. Converts data batches into messages.
+
+ class zmq_decoder_t : public decoder_t <zmq_decoder_t>
+ {
+ public:
+
+ zmq_decoder_t ();
+ ~zmq_decoder_t ();
+
+ void set_inout (struct i_inout *destination_);
+
+ private:
+
+ bool one_byte_size_ready ();
+ bool eight_byte_size_ready ();
+ bool message_ready ();
+
+ struct i_inout *destination;
+ unsigned char tmpbuf [8];
+ ::zmq_msg in_progress;
+
+ zmq_decoder_t (const zmq_decoder_t&);
+ void operator = (const zmq_decoder_t&);
+ };
+
+}
+
+#endif
+
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
new file mode 100644
index 0000000..124d77b
--- /dev/null
+++ b/src/zmq_encoder.cpp
@@ -0,0 +1,76 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "zmq_encoder.hpp"
+#include "i_inout.hpp"
+#include "wire.hpp"
+
+zmq::zmq_encoder_t::zmq_encoder_t () :
+ source (NULL)
+{
+ zmq_msg_init (&in_progress);
+
+ // Write 0 bytes to the batch and go to message_ready state.
+ next_step (NULL, 0, &zmq_encoder_t::message_ready, true);
+}
+
+zmq::zmq_encoder_t::~zmq_encoder_t ()
+{
+ zmq_msg_close (&in_progress);
+}
+
+void zmq::zmq_encoder_t::set_inout (i_inout *source_)
+{
+ source = source_;
+}
+
+bool zmq::zmq_encoder_t::size_ready ()
+{
+ // Write message body into the buffer.
+ next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
+ &zmq_encoder_t::message_ready, false);
+ return true;
+}
+
+bool zmq::zmq_encoder_t::message_ready ()
+{
+ // Read new message from the dispatcher. If there is none, return false.
+ // Note that new state is set only if write is successful. That way
+ // unsuccessful write will cause retry on the next state machine
+ // invocation.
+ if (!source->read (&in_progress)) {
+ return false;
+ }
+ size_t size = zmq_msg_size (&in_progress);
+
+ // For messages less than 255 bytes long, write one byte of message size.
+ // For longer messages write 0xff escape character followed by 8-byte
+ // message size.
+ if (size < 255) {
+ tmpbuf [0] = (unsigned char) size;
+ next_step (tmpbuf, 1, &zmq_encoder_t::size_ready, true);
+ }
+ else {
+ tmpbuf [0] = 0xff;
+ put_uint64 (tmpbuf + 1, size);
+ next_step (tmpbuf, 9, &zmq_encoder_t::size_ready, true);
+ }
+ return true;
+}
+
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
new file mode 100644
index 0000000..89af265
--- /dev/null
+++ b/src/zmq_encoder.hpp
@@ -0,0 +1,55 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__
+#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__
+
+#include "../include/zmq.h"
+
+#include "encoder.hpp"
+
+namespace zmq
+{
+ // Encoder for 0MQ backend protocol. Converts messages into data batches.
+
+ class zmq_encoder_t : public encoder_t <zmq_encoder_t>
+ {
+ public:
+
+ zmq_encoder_t ();
+ ~zmq_encoder_t ();
+
+ void set_inout (struct i_inout *source_);
+
+ private:
+
+ bool size_ready ();
+ bool message_ready ();
+
+ struct i_inout *source;
+ ::zmq_msg in_progress;
+ unsigned char tmpbuf [9];
+
+ zmq_encoder_t (const zmq_encoder_t&);
+ void operator = (const zmq_encoder_t&);
+ };
+}
+
+#endif
+
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 3708c9a..3620d30 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -19,17 +19,118 @@
#include "zmq_engine.hpp"
#include "io_thread.hpp"
+#include "i_inout.hpp"
+#include "config.hpp"
+#include "err.hpp"
-zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, object_t *owner_) :
- io_object_t (parent_, owner_)
+zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
+ io_object_t (parent_),
+ insize (0),
+ inpos (0),
+ outsize (0),
+ outpos (0),
+ inout (NULL)
{
+ // Allocate read & write buffer.
+ inbuf = (unsigned char*) malloc (in_batch_size);
+ zmq_assert (inbuf);
+ outbuf = (unsigned char*) malloc (out_batch_size);
+ zmq_assert (outbuf);
+
+ // Initialise the underlying socket.
+ int rc = tcp_socket.open (fd_);
+ zmq_assert (rc == 0);
}
zmq::zmq_engine_t::~zmq_engine_t ()
{
+ free (outbuf);
+ free (inbuf);
+}
+
+void zmq::zmq_engine_t::plug (i_inout *inout_)
+{
+ encoder.set_inout (inout_);
+ decoder.set_inout (inout_);
+
+ handle = add_fd (tcp_socket.get_fd ());
+ set_pollin (handle);
+ set_pollout (handle);
+
+ inout = inout_;
+}
+
+void zmq::zmq_engine_t::unplug ()
+{
+ rm_fd (handle);
+ inout = NULL;
+}
+
+void zmq::zmq_engine_t::in_event ()
+{
+ // If there's no data to process in the buffer, read new data.
+ if (inpos == insize) {
+
+ // Read as much data as possible to the read buffer.
+ insize = tcp_socket.read (inbuf, in_batch_size);
+printf ("%d bytes read\n", (int) insize);
+ inpos = 0;
+
+ // Check whether the peer has closed the connection.
+ if (insize == -1) {
+ insize = 0;
+ error ();
+ return;
+ }
+ }
+
+ // Following code should be executed even if there's not a single byte in
+ // the buffer. There still can be a decoded messages stored in the decoder.
+
+ // Push the data to the decoder.
+ int nbytes = decoder.write (inbuf + inpos, insize - inpos);
+
+ // Adjust read position. Stop polling for input if we got stuck.
+ inpos += nbytes;
+ if (inpos < insize)
+ reset_pollin (handle);
+
+ // If at least one byte was processed, flush all messages the decoder
+ // may have produced.
+ if (nbytes > 0)
+ inout->flush ();
+
}
-void zmq::zmq_engine_t::process_plug ()
+void zmq::zmq_engine_t::out_event ()
{
+ // If write buffer is empty, try to read new data from the encoder.
+ if (outpos == outsize) {
+
+ outsize = encoder.read (outbuf, out_batch_size);
+ outpos = 0;
+
+ // If there is no data to send, stop polling for output.
+ if (outsize == 0)
+ reset_pollout (handle);
+ }
+
+ // If there are any data to write in write buffer, write as much as
+ // possible to the socket.
+ if (outpos < outsize) {
+ int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos);
+
+ // Handle problems with the connection.
+ if (nbytes == -1) {
+ error ();
+ return;
+ }
+
+ outpos += nbytes;
+ }
}
+void zmq::zmq_engine_t::error ()
+{
+ zmq_assert (false);
+}
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index ad739c7..38a390d 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -21,6 +21,9 @@
#define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#include "io_object.hpp"
+#include "tcp_socket.hpp"
+#include "zmq_encoder.hpp"
+#include "zmq_decoder.hpp"
namespace zmq
{
@@ -29,14 +32,36 @@ namespace zmq
{
public:
- zmq_engine_t (class io_thread_t *parent_, object_t *owner_);
+ zmq_engine_t (class io_thread_t *parent_, fd_t fd_);
+ ~zmq_engine_t ();
+
+ void plug (struct i_inout *inout_);
+ void unplug ();
+
+ // i_poll_events interface implementation.
+ void in_event ();
+ void out_event ();
private:
- ~zmq_engine_t ();
+ // Function to handle network disconnections.
+ void error ();
+
+ tcp_socket_t tcp_socket;
+ handle_t handle;
+
+ unsigned char *inbuf;
+ int insize;
+ int inpos;
+
+ unsigned char *outbuf;
+ int outsize;
+ int outpos;
+
+ i_inout *inout;
- // Handlers for incoming commands.
- void process_plug ();
+ zmq_encoder_t encoder;
+ zmq_decoder_t decoder;
zmq_engine_t (const zmq_engine_t&);
void operator = (const zmq_engine_t&);
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
new file mode 100644
index 0000000..fea1452
--- /dev/null
+++ b/src/zmq_init.cpp
@@ -0,0 +1,110 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "zmq_init.hpp"
+#include "io_thread.hpp"
+#include "session.hpp"
+#include "err.hpp"
+
+zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, object_t *owner_, fd_t fd_,
+ bool connected_, const options_t &options_) :
+ owned_t (parent_, owner_),
+ connected (connected_),
+ options (options_)
+{
+ // Create associated engine object.
+ engine = new zmq_engine_t (parent_, fd_);
+ zmq_assert (engine);
+}
+
+zmq::zmq_init_t::~zmq_init_t ()
+{
+ if (engine)
+ delete engine;
+}
+
+bool zmq::zmq_init_t::read (::zmq_msg *msg_)
+{
+ // On the listening side, no initialisation data are sent to the peer.
+ if (!connected)
+ return false;
+
+ // Send identity.
+ int rc = zmq_msg_init_size (msg_, options.identity.size ());
+ zmq_assert (rc == 0);
+ memcpy (zmq_msg_data (msg_), options.identity.c_str (),
+ options.identity.size ());
+
+ // Initialisation is done.
+ create_session ();
+
+ return true;
+}
+
+bool zmq::zmq_init_t::write (::zmq_msg *msg_)
+{
+ // On the connecting side no initialisation data are expected.
+ if (connected)
+ return false;
+
+ // Retreieve the identity.
+ options.identity = std::string ((const char*) zmq_msg_data (msg_),
+ zmq_msg_size (msg_));
+
+ // Initialisation is done.
+ create_session ();
+
+ return true;
+}
+
+void zmq::zmq_init_t::flush ()
+{
+ // No need to do anything. zmq_init_t does no batching of messages.
+ // Each message is processed immediately on write.
+}
+
+void zmq::zmq_init_t::process_plug ()
+{
+ engine->plug (this);
+ owned_t::process_plug ();
+}
+
+void zmq::zmq_init_t::process_unplug ()
+{
+ engine->unplug ();
+}
+
+void zmq::zmq_init_t::create_session ()
+{
+ // Disconnect engine from the init object.
+ engine->unplug ();
+
+ // Create the session instance.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ session_t *session = new session_t (io_thread, owner, engine);
+ zmq_assert (session);
+ engine = NULL;
+
+ // Pass session/engine pair to a chosen I/O thread.
+ send_plug (session);
+ send_own (owner, session);
+
+ // Destroy the init object.
+ term ();
+}
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
new file mode 100644
index 0000000..2e0910a
--- /dev/null
+++ b/src/zmq_init.hpp
@@ -0,0 +1,82 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
+#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
+
+#include <string>
+
+#include "i_inout.hpp"
+#include "owned.hpp"
+#include "zmq_engine.hpp"
+#include "stdint.hpp"
+#include "fd.hpp"
+#include "options.hpp"
+
+namespace zmq
+{
+
+ // The class handles initialisation phase of native 0MQ wire-level
+ // protocol. Currently it can be used to handle both sides of the
+ // connection. If it grows to complex, we can separate the two into
+ // distinct classes.
+
+ class zmq_init_t : public owned_t, public i_inout
+ {
+ public:
+
+ // Set 'connected' to true if the connection was created by 'connect'
+ // function. If it was accepted from a listening socket, set it to
+ // false.
+ zmq_init_t (class io_thread_t *parent_, object_t *owner_, fd_t fd_,
+ bool connected_, const options_t &options);
+ ~zmq_init_t ();
+
+ private:
+
+ // i_inout interface implementation.
+ bool read (::zmq_msg *msg_);
+ bool write (::zmq_msg *msg_);
+ void flush ();
+
+ // Handlers for incoming commands.
+ void process_plug ();
+ void process_unplug ();
+
+ void create_session ();
+
+ // Engine is created by zmq_init_t object. Once the initialisation
+ // phase is over it is passed to a session object, possibly running
+ // in a different I/O thread.
+ zmq_engine_t *engine;
+
+ // If true, we are on the connecting side. If false, we are on the
+ // listening side.
+ bool connected;
+
+ // Associated socket options.
+ options_t options;
+
+ zmq_init_t (const zmq_init_t&);
+ void operator = (const zmq_init_t&);
+ };
+
+}
+
+#endif
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 463a1dd..c990468 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -18,12 +18,15 @@
*/
#include "zmq_listener.hpp"
-#include "zmq_engine.hpp"
+#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
-zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) :
- io_object_t (parent_, owner_)
+zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_,
+ const options_t &options_) :
+ owned_t (parent_, owner_),
+ io_object_t (parent_),
+ options (options_)
{
}
@@ -46,7 +49,7 @@ void zmq::zmq_listener_t::process_plug ()
handle = add_fd (tcp_listener.get_fd ());
set_pollin (handle);
- io_object_t::process_plug ();
+ owned_t::process_plug ();
}
void zmq::zmq_listener_t::process_unplug ()
@@ -63,14 +66,12 @@ void zmq::zmq_listener_t::in_event ()
if (fd == retired_fd)
return;
- // TODO
- zmq_assert (false);
-
-/*
- object_t *engine = new zmq_engine_t (choose_io_thread (0), owner);
- send_plug (engine);
- send_own (owner, engine);
-*/
+ // Create an init object.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, false, options);
+ zmq_assert (init);
+ send_plug (init);
+ send_own (owner, init);
}
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
index ef252b3..f85ad5a 100644
--- a/src/zmq_listener.hpp
+++ b/src/zmq_listener.hpp
@@ -20,17 +20,21 @@
#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
+#include "owned.hpp"
#include "io_object.hpp"
#include "tcp_listener.hpp"
+#include "options.hpp"
+#include "stdint.hpp"
namespace zmq
{
- class zmq_listener_t : public io_object_t
+ class zmq_listener_t : public owned_t, public io_object_t
{
public:
- zmq_listener_t (class io_thread_t *parent_, object_t *owner_);
+ zmq_listener_t (class io_thread_t *parent_, object_t *owner_,
+ const options_t &options_);
// Set IP address to listen on.
int set_address (const char *addr_);
@@ -52,6 +56,9 @@ namespace zmq
// Handle corresponding to the listening socket.
handle_t handle;
+ // Associated socket options.
+ options_t options;
+
zmq_listener_t (const zmq_listener_t&);
void operator = (const zmq_listener_t&);
};