summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-12 09:40:16 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-12 09:40:16 +0200
commit059beca59d39d90a8ee0e1b07f840994962ea89e (patch)
tree007a5d86450c543bb9a362a844ee271115b68c54 /src
parentbda766ab401b6c565fe9c2d0bc80c11bbbe84488 (diff)
listener/connecter/init/session added
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am13
-rw-r--r--src/atomic.hpp310
-rw-r--r--src/i_inout.hpp37
-rw-r--r--src/io_object.cpp47
-rw-r--r--src/io_object.hpp50
-rw-r--r--src/options.cpp29
-rw-r--r--src/options.hpp42
-rw-r--r--src/owned.cpp74
-rw-r--r--src/owned.hpp82
-rw-r--r--src/session.cpp58
-rw-r--r--src/session.hpp57
-rw-r--r--src/socket_base.cpp29
-rw-r--r--src/socket_base.hpp8
-rw-r--r--src/zmq_connecter.cpp36
-rw-r--r--src/zmq_connecter.hpp17
-rw-r--r--src/zmq_decoder.cpp78
-rw-r--r--src/zmq_decoder.hpp57
-rw-r--r--src/zmq_encoder.cpp76
-rw-r--r--src/zmq_encoder.hpp55
-rw-r--r--src/zmq_engine.cpp107
-rw-r--r--src/zmq_engine.hpp33
-rw-r--r--src/zmq_init.cpp110
-rw-r--r--src/zmq_init.hpp82
-rw-r--r--src/zmq_listener.cpp25
-rw-r--r--src/zmq_listener.hpp11
25 files changed, 1069 insertions, 454 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 8524067..9d6127c 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -15,6 +15,7 @@ libzmq_la_SOURCES = \
err.hpp \
fd.hpp \
fd_signaler.hpp \
+ i_inout.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
@@ -25,10 +26,13 @@ libzmq_la_SOURCES = \
msg.hpp \
mutex.hpp \
object.hpp \
+ options.hpp \
+ owner.hpp \
pipe.hpp \
platform.hpp \
poll.hpp \
select.hpp \
+ session.hpp \
simple_semaphore.hpp \
socket_base.hpp \
stdint.hpp \
@@ -43,7 +47,10 @@ libzmq_la_SOURCES = \
ypollset.hpp \
yqueue.hpp \
zmq_connecter.hpp \
+ zmq_decoder.hpp \
+ zmq_encoder.hpp \
zmq_engine.hpp \
+ zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
devpoll.cpp \
@@ -56,8 +63,11 @@ libzmq_la_SOURCES = \
ip.cpp \
kqueue.cpp \
object.cpp \
+ options.cpp \
+ owned.cpp \
poll.cpp \
select.cpp \
+ session.cpp \
socket_base.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
@@ -67,7 +77,10 @@ libzmq_la_SOURCES = \
ypollset.cpp \
zmq.cpp \
zmq_connecter.cpp \
+ zmq_decoder.cpp \
+ zmq_encoder.cpp \
zmq_engine.cpp \
+ zmq_init.cpp \
zmq_listener.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
diff --git a/src/atomic.hpp b/src/atomic.hpp
deleted file mode 100644
index e581593..0000000
--- a/src/atomic.hpp
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ATOMIC_HPP_INCLUDED__
-#define __ZMQ_ATOMIC_HPP_INCLUDED__
-
-#include "stdint.hpp"
-
-#if defined ZMQ_FORCE_MUTEXES
-#define ZMQ_ATOMIC_MUTEX
-#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
-#define ZMQ_ATOMIC_X86
-#elif defined ZMQ_HAVE_WINDOWS
-#define ZMQ_ATOMIC_WINDOWS
-#elif defined ZMQ_HAVE_SOLARIS
-#define ZMQ_ATOMIC_SOLARIS
-#else
-#define ZMQ_ATOMIC_MUTEX
-#endif
-
-namespace zmq
-{
-
- // Atomic assignement.
- inline void atomic_uint32_set (volatile uint32_t *p_, uint32_t value_)
- {
- *p_ = value_;
- // StoreLoad memory barrier should go here on platforms with
- // memory models that require it.
- }
-
- // Atomic retrieval of an integer.
- inline uint32_t atomic_uint32_get (volatile uint32_t *p_)
- {
- // StoreLoad memory barrier should go here on platforms with
- // memory models that require it.
- return *p_;
- }
-
- // Atomic addition. Returns the old value.
- inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- return InterlockedExchangeAdd ((LONG*) &value, increment_);
-#elif defined ZMQ_ATOMIC_SOLARIS
- return atomic_add_32_nv (&value, increment_) - delta_;
-#elif defined ZMQ_ATOMIC_X86
- uint32_t old;
- __asm__ volatile (
- "lock; xadd %0, %1\n\t"
- : "=r" (old), "=m" (*p_)
- : "0" (delta_), "m" (*p_)
- : "cc", "memory");
- return old;
-#else
-#error // TODO:
- sync.lock ();
- uint32_t old = *p_;
- *p_ += delta_;
- sync.unlock ();
-#endif
- }
-
- // Atomic subtraction. Returns the old value.
- inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- LONG delta = - ((LONG) delta_);
- return InterlockedExchangeAdd ((LONG*) &value, delta);
-#elif defined ZMQ_ATOMIC_SOLARIS
- int32_t delta = - ((int32_t) delta_);
- return atomic_add_32_nv (&value, delta) + delta_;
-#elif defined ZMQ_ATOMIC_X86
- uint32_t old = -delta_;
- __asm__ volatile ("lock; xaddl %0,%1"
- : "=r" (old), "=m" (*p_)
- : "0" (old), "m" (*p_)
- : "cc");
- return old;
-#else
-#error // TODO:
- sync.lock ();
- uint32_t old = *p_;
- *p_ -= delta_;
- sync.unlock ();
- return old;
-#endif
- }
-
- // Atomic assignement.
- template <typename T>
- inline void atomic_ptr_set (volatile T **p_, T *value_)
- {
- *p_ = value_;
- // StoreLoad memory barrier should go here on platforms with
- // memory models that require it.
- }
-
- // Perform atomic 'exchange pointers' operation. Old value is returned.
- template <typename T>
- inline void *atomic_ptr_xchg (volatile T **p_, T *value_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- return InterlockedExchangePointer (p_, value_);
-#elif defined ZMQ_ATOMIC_SOLARIS
- return atomic_swap_ptr (p_, value_);
-#elif defined ZMQ_ATOMIC_X86
- void *old;
- __asm__ volatile (
- "lock; xchg %0, %2"
- : "=r" (old), "=m" (*p_)
- : "m" (*p_), "0" (value_));
- return old;
-#else
-#error // TODO:
- sync.lock ();
- void *old = *p_;
- *p_ = value_;
- sync.unlock ();
- return old;
-#endif
- }
-
- // Perform atomic 'compare and swap' operation on the pointer.
- // The pointer is compared to 'cmp' argument and if they are
- // equal, its value is set to 'value'. Old value of the pointer
- // is returned.
- template <typename T>
- inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- return InterlockedCompareExchangePointer (p_, value_, cmp_);
-#elif defined ZMQ_ATOMIC_SOLARIS
- return atomic_cas_ptr (p_, cmp_, value_);
-#elif defined ZMQ_ATOMIC_X86
- void *old;
- __asm__ volatile (
- "lock; cmpxchg %2, %3"
- : "=a" (old), "=m" (*p_)
- : "r" (value_), "m" (*p_), "0" (cmp_)
- : "cc");
- return old;
-#else
-#error // TODO:
- sync.lock ();
- void *old = *p_;
- if (old == cmp_)
- *p_ = value_;
- sync.unlock ();
- return old;
-#endif
- }
-
-#if defined ZMQ_ATOMIC_X86 && defined __x86_64__
- typedef uint64_t atomic_bitmap_t;
-#else
- typedef uint32_t atomic_bitmap_t;
-#endif
-
- // Atomic assignement.
- inline void atomic_bitmap_set (volatile atomic_bitmap_t *p_,
- atomic_bitmap_t value_)
- {
- *p_ = value_;
- // StoreLoad memory barrier should go here on platforms with
- // memory models that require it.
- }
-
- // Bit-test-set-and-reset. Sets one bit of the value and resets
- // another one. Returns the original value of the reset bit.
- inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_,
- int set_index_, int reset_index_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- while (true) {
- atomic_bitmap_t oldval = *p_;
- atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) <<
- set_index_)) & ~(integer_t (1) << reset_index_);
- if (InterlockedCompareExchange ((volatile LONG*) p_, newval,
- oldval) == (LONG) oldval)
- return (oldval & (atomic_bitmap_t (1) << reset_index_)) ?
- true : false;
- }
-#elif defined ZMQ_ATOMIC_SOLARIS
- while (true) {
- atomic_bitmap_t oldval = *p_;
- atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) <<
- set_index_)) & ~(integer_t (1) << reset_index_);
- if (atomic_cas_32 (p_, oldval, newval) == oldval)
- return (oldval & (atomic_bitmap_t (1) << reset_index_)) ?
- true : false;
- }
-#elif defined ZMQ_ATOMIC_X86
- atomic_bitmap_t oldval, dummy;
- __asm__ volatile (
- "mov %0, %1\n\t"
- "1:"
- "mov %1, %2\n\t"
- "bts %3, %2\n\t"
- "btr %4, %2\n\t"
- "lock cmpxchg %2, %0\n\t"
- "jnz 1b\n\t"
- : "+m" (*p_), "=&a" (oldval), "=&r" (dummy)
- : "r" (atomic_bitmap_t (set_index_)),
- "r" (atomic_bitmap_t (reset_index_))
- : "cc");
- return (bool) (oldval & (atomic_bitmap_t (1) << reset_index_));
-#else
-#error // TODO:
- sync.lock ();
- atomic_bitmap_t oldval = *p_;
- *p_ = (oldval | (atomic_bitmap_t (1) << set_index_)) &
- ~(atomic_bitmap_t (1) << reset_index_);
- sync.unlock ();
- return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false;
-#endif
- }
-
- // Sets value to newval. Returns the original value.
- inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_,
- atomic_bitmap_t newval_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- return InterlockedExchange ((volatile LONG*) p_, newval_);
-#elif defined ZMQ_ATOMIC_SOLARIS
- return atomic_swap_32 (p_, newval_);
-#elif defined ZMQ_ATOMIC_X86
- atomic_bitmap_t oldval = newval_;
- __asm__ volatile (
- "lock; xchg %0, %1"
- : "=r" (oldval)
- : "m" (*p_), "0" (oldval)
- : "memory");
- return oldval;
-#else
-#error // TODO:
- sync.lock ();
- atomic_bitmap_t oldval = *p_;
- *p_ = newval_;
- sync.unlock ();
-#endif
- }
-
- // izte is "if-zero-then-else" atomic operation - if the value is zero
- // it substitutes it by 'thenval' else it rewrites it by 'elseval'.
- // Original value of the integer is returned from this function.
- inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_,
- atomic_bitmap_t thenval_, atomic_bitmap_t elseval_)
- {
-#if defined ZMQ_ATOMIC_WINDOWS
- while (true) {
- atomic_bitmap_t oldval = *p_;
- atomic_bitmap_t newval = (oldval ? elseval_ : thenval_);
- if (InterlockedCompareExchange ((volatile LONG*) p_, newval,
- oldval) == (LONG) oldval)
- return oldval;
- }
-#elif defined ZMQ_ATOMIC_SOLARIS
- while (true) {
- atomic_bitmap_t oldval = *p_;
- atomic_bitmap_t newval = (oldval ? elseval_ : thenval_);
- if (atomic_cas_32 (p_, oldval, newval) == oldval)
- return oldval;
- }
-#elif defined ZMQ_ATOMIC_X86
- atomic_bitmap_t oldval;
- atomic_bitmap_t dummy;
- __asm__ volatile (
- "mov %0, %1\n\t"
- "1:"
- "mov %3, %2\n\t"
- "test %1, %1\n\t"
- "jz 2f\n\t"
- "mov %4, %2\n\t"
- "2:"
- "lock cmpxchg %2, %0\n\t"
- "jnz 1b\n\t"
- : "+m" (*p_), "=&a" (oldval), "=&r" (dummy)
- : "r" (thenval_), "r" (elseval_)
- : "cc");
- return oldval;
-#else
-#error // TODO:
- sync.lock ();
- atomic_bitmap_t oldval = *p_;
- *p_ = oldval ? elseval_ : thenval_;
- sync.unlock ();
- return oldval;
-#endif
- }
-
-}
-
-#endif
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
new file mode 100644
index 0000000..be2e007
--- /dev/null
+++ b/src/i_inout.hpp
@@ -0,0 +1,37 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
+#define __ZMQ_I_INOUT_HPP_INCLUDED__
+
+#include "../include/zmq.h"
+
+namespace zmq
+{
+
+ struct i_inout
+ {
+ virtual bool read (::zmq_msg *msg_) = 0;
+ virtual bool write (::zmq_msg *msg_) = 0;
+ virtual void flush () = 0;
+ };
+
+}
+
+#endif
diff --git a/src/io_object.cpp b/src/io_object.cpp
index d8cc1c0..f61e5f0 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -21,35 +21,19 @@
#include "io_thread.hpp"
#include "err.hpp"
-zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) :
- object_t (parent_),
- owner (owner_),
- plugged_in (false),
- terminated (false)
+zmq::io_object_t::io_object_t (io_thread_t *io_thread_)
{
// Retrieve the poller from the thread we are running in.
- poller = parent_->get_poller ();
+ poller = io_thread_->get_poller ();
}
zmq::io_object_t::~io_object_t ()
{
}
-void zmq::io_object_t::process_plug ()
+void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)
{
- zmq_assert (!plugged_in);
-
- // If termination of the object was already requested, destroy it and
- // send the termination acknowledgement.
- if (terminated) {
- send_term_ack (owner);
- delete this;
- return;
- }
-
- // Notify the generic termination mechanism (io_object_t) that the object
- // is already plugged in.
- plugged_in = true;
+ poller = io_thread_->get_poller ();
}
zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_)
@@ -106,26 +90,3 @@ void zmq::io_object_t::timer_event ()
{
zmq_assert (false);
}
-
-void zmq::io_object_t::term ()
-{
- send_term_req (owner, this);
-}
-
-void zmq::io_object_t::process_term ()
-{
- zmq_assert (!terminated);
-
- // If termination request has occured even before the object was plugged in
- // wait till plugging in happens, then acknowledge the termination.
- if (!plugged_in) {
- terminated = true;
- return;
- }
-
- // Otherwise, destroy the object and acknowledge the termination
- // straight away.
- send_term_ack (owner);
- process_unplug ();
- delete this;
-}
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 4f323ad..e5582db 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -20,41 +20,31 @@
#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__
#define __ZMQ_IO_OBJECT_HPP_INCLUDED__
-#include "object.hpp"
+#include <stddef.h>
+
#include "i_poller.hpp"
#include "i_poll_events.hpp"
namespace zmq
{
- class io_object_t : public object_t, public i_poll_events
+ // Simple base class for objects that live in I/O threads.
+ // It makes communication with the poller object easier and
+ // makes defining unneeded event handlers unnecessary.
+
+ class io_object_t : public i_poll_events
{
public:
- // I/O object will live in the thread inherited from the parent.
- // However, it's lifetime is managed by the owner.
- io_object_t (class io_thread_t *parent_, object_t *owner_);
+ io_object_t (class io_thread_t *io_thread_ = NULL);
+ ~io_object_t ();
protected:
- // Ask owner socket to terminate this I/O object. This may not happen
- void term ();
-
- // I/O object destroys itself. No point in allowing others to invoke
- // the destructor. At the same time, it has to be virtual so that
- // generic io_object deallocation mechanism destroys specific type
- // of I/O object correctly.
- virtual ~io_object_t ();
-
- // Handlers for incoming commands. It vital that every I/O object
- // invokes io_object_t::process_plug at the end of it's own plug
- // handler.
- void process_plug ();
-
- // io_object_t defines a new handler used to disconnect the object
- // from the poller object. Implement the handlen in the derived
- // classes to ensure sane cleanup.
- virtual void process_unplug () = 0;
+ // Derived class can init/swap the underlying I/O thread.
+ // Caution: Remove all the file descriptors from the old I/O thread
+ // before swapping to the new one!
+ void set_io_thread (class io_thread_t *io_thread_);
// Methods to access underlying poller object.
handle_t add_fd (fd_t fd_);
@@ -71,24 +61,10 @@ namespace zmq
void out_event ();
void timer_event ();
- // Socket owning this I/O object. It is responsible for destroying
- // it when it's being closed.
- object_t *owner;
-
private:
- // Set to true when object is plugged in.
- bool plugged_in;
-
- // Set to true when object was terminated before it was plugged in.
- // In such case destruction is delayed till 'plug' command arrives.
- bool terminated;
-
struct i_poller *poller;
- // Handlers for incoming commands.
- void process_term ();
-
io_object_t (const io_object_t&);
void operator = (const io_object_t&);
};
diff --git a/src/options.cpp b/src/options.cpp
new file mode 100644
index 0000000..cd07c44
--- /dev/null
+++ b/src/options.cpp
@@ -0,0 +1,29 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "options.hpp"
+
+zmq::options_t::options_t () :
+ hwm (0),
+ lwm (0),
+ swap (0),
+ mask (0),
+ affinity (0)
+{
+}
diff --git a/src/options.hpp b/src/options.hpp
new file mode 100644
index 0000000..7d78da2
--- /dev/null
+++ b/src/options.hpp
@@ -0,0 +1,42 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
+#define __ZMQ_OPTIONS_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ struct options_t
+ {
+ options_t ();
+
+ int64_t hwm;
+ int64_t lwm;
+ int64_t swap;
+ uint64_t mask;
+ uint64_t affinity;
+ std::string identity;
+ };
+
+}
+
+#endif
diff --git a/src/owned.cpp b/src/owned.cpp
new file mode 100644
index 0000000..22e257f
--- /dev/null
+++ b/src/owned.cpp
@@ -0,0 +1,74 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "owned.hpp"
+#include "err.hpp"
+
+zmq::owned_t::owned_t (object_t *parent_, object_t *owner_) :
+ object_t (parent_),
+ owner (owner_),
+ plugged_in (false),
+ terminated (false)
+{
+}
+
+zmq::owned_t::~owned_t ()
+{
+}
+
+void zmq::owned_t::process_plug ()
+{
+ zmq_assert (!plugged_in);
+
+ // If termination of the object was already requested, destroy it and
+ // send the termination acknowledgement.
+ if (terminated) {
+ send_term_ack (owner);
+ delete this;
+ return;
+ }
+
+ // Notify the generic termination mechanism (io_object_t) that the object
+ // is already plugged in.
+ plugged_in = true;
+}
+
+void zmq::owned_t::term ()
+{
+ send_term_req (owner, this);
+}
+
+void zmq::owned_t::process_term ()
+{
+ zmq_assert (!terminated);
+
+ // If termination request has occured even before the object was plugged in
+ // wait till plugging in happens, then acknowledge the termination.
+ if (!plugged_in) {
+ terminated = true;
+ return;
+ }
+
+ // Otherwise, destroy the object and acknowledge the termination
+ // straight away.
+ send_term_ack (owner);
+ process_unplug ();
+ delete this;
+}
+
diff --git a/src/owned.hpp b/src/owned.hpp
new file mode 100644
index 0000000..164622e
--- /dev/null
+++ b/src/owned.hpp
@@ -0,0 +1,82 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_OWNED_HPP_INCLUDED__
+#define __ZMQ_OWNED_HPP_INCLUDED__
+
+#include "object.hpp"
+
+namespace zmq
+{
+
+ // Base class for objects owned by individual sockets. Handles
+ // initialisation and destruction of such objects.
+
+ class owned_t : public object_t
+ {
+ public:
+
+ // The object will live in parent's thread, however, its lifetime
+ // will be managed by its owner socket.
+ owned_t (object_t *parent_, object_t *owner_);
+
+ protected:
+
+ // Ask owner socket to terminate this object.
+ void term ();
+
+ // Derived object destroys owned_t. No point in allowing others to
+ // invoke the destructor. At the same time, it has to be virtual so
+ // that generic owned_t deallocation mechanism destroys specific type
+ // of the owned object correctly.
+ virtual ~owned_t ();
+
+ // Handlers for incoming commands. It's vital that every I/O object
+ // invokes io_object_t::process_plug at the end of it's own plug
+ // handler.
+ void process_plug ();
+
+ // io_object_t defines a new handler used to disconnect the object
+ // from the poller object. Implement the handlen in the derived
+ // classes to ensure sane cleanup.
+ virtual void process_unplug () = 0;
+
+ // Socket owning this object. It is responsible for destroying
+ // it when it's being closed.
+ object_t *owner;
+
+ private:
+
+ // Handlers for incoming commands.
+ void process_term ();
+
+ // Set to true when object is plugged in.
+ bool plugged_in;
+
+ // Set to true when object was terminated before it was plugged in.
+ // In such case destruction is delayed till 'plug' command arrives.
+ bool terminated;
+
+ owned_t (const owned_t&);
+ void operator = (const owned_t&);
+ };
+
+}
+
+#endif
diff --git a/src/session.cpp b/src/session.cpp
new file mode 100644
index 0000000..fa29dd3
--- /dev/null
+++ b/src/session.cpp
@@ -0,0 +1,58 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "session.hpp"
+#include "zmq_engine.hpp"
+#include "err.hpp"
+
+zmq::session_t::session_t (object_t *parent_, object_t *owner_,
+ zmq_engine_t *engine_) :
+ owned_t (parent_, owner_),
+ engine (engine_)
+{
+}
+
+zmq::session_t::~session_t ()
+{
+}
+
+bool zmq::session_t::read (::zmq_msg *msg_)
+{
+ return false;
+}
+
+bool zmq::session_t::write (::zmq_msg *msg_)
+{
+ return false;
+}
+
+void zmq::session_t::flush ()
+{
+}
+
+void zmq::session_t::process_plug ()
+{
+ engine->plug (this);
+ owned_t::process_plug ();
+}
+
+void zmq::session_t::process_unplug ()
+{
+ engine->unplug ();
+}
diff --git a/src/session.hpp b/src/session.hpp
new file mode 100644
index 0000000..4228fd9
--- /dev/null
+++ b/src/session.hpp
@@ -0,0 +1,57 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_SESSION_HPP_INCLUDED__
+#define __ZMQ_SESSION_HPP_INCLUDED__
+
+#include "i_inout.hpp"
+#include "owned.hpp"
+
+namespace zmq
+{
+
+ class session_t : public owned_t, public i_inout
+ {
+ public:
+
+ session_t (object_t *parent_, object_t *owner_,
+ class zmq_engine_t *engine_);
+
+ private:
+
+ ~session_t ();
+
+ // i_inout interface implementation.
+ bool read (::zmq_msg *msg_);
+ bool write (::zmq_msg *msg_);
+ void flush ();
+
+ // H