From 059beca59d39d90a8ee0e1b07f840994962ea89e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 12 Aug 2009 09:40:16 +0200 Subject: listener/connecter/init/session added --- src/Makefile.am | 13 +++ src/atomic.hpp | 310 -------------------------------------------------- src/i_inout.hpp | 37 ++++++ src/io_object.cpp | 47 +------- src/io_object.hpp | 50 +++----- src/options.cpp | 29 +++++ src/options.hpp | 42 +++++++ src/owned.cpp | 74 ++++++++++++ src/owned.hpp | 82 +++++++++++++ src/session.cpp | 58 ++++++++++ src/session.hpp | 57 ++++++++++ src/socket_base.cpp | 29 ++--- src/socket_base.hpp | 8 +- src/zmq_connecter.cpp | 36 +++--- src/zmq_connecter.hpp | 17 ++- src/zmq_decoder.cpp | 78 +++++++++++++ src/zmq_decoder.hpp | 57 ++++++++++ src/zmq_encoder.cpp | 76 +++++++++++++ src/zmq_encoder.hpp | 55 +++++++++ src/zmq_engine.cpp | 107 ++++++++++++++++- src/zmq_engine.hpp | 33 +++++- src/zmq_init.cpp | 110 ++++++++++++++++++ src/zmq_init.hpp | 82 +++++++++++++ src/zmq_listener.cpp | 25 ++-- src/zmq_listener.hpp | 11 +- 25 files changed, 1069 insertions(+), 454 deletions(-) delete mode 100644 src/atomic.hpp create mode 100644 src/i_inout.hpp create mode 100644 src/options.cpp create mode 100644 src/options.hpp create mode 100644 src/owned.cpp create mode 100644 src/owned.hpp create mode 100644 src/session.cpp create mode 100644 src/session.hpp create mode 100644 src/zmq_decoder.cpp create mode 100644 src/zmq_decoder.hpp create mode 100644 src/zmq_encoder.cpp create mode 100644 src/zmq_encoder.hpp create mode 100644 src/zmq_init.cpp create mode 100644 src/zmq_init.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 8524067..9d6127c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -15,6 +15,7 @@ libzmq_la_SOURCES = \ err.hpp \ fd.hpp \ fd_signaler.hpp \ + i_inout.hpp \ io_object.hpp \ io_thread.hpp \ ip.hpp \ @@ -25,10 +26,13 @@ libzmq_la_SOURCES = \ msg.hpp \ mutex.hpp \ object.hpp \ + options.hpp \ + owner.hpp \ pipe.hpp \ platform.hpp \ poll.hpp \ select.hpp \ + session.hpp \ simple_semaphore.hpp \ socket_base.hpp \ stdint.hpp \ @@ -43,7 +47,10 @@ libzmq_la_SOURCES = \ ypollset.hpp \ yqueue.hpp \ zmq_connecter.hpp \ + zmq_decoder.hpp \ + zmq_encoder.hpp \ zmq_engine.hpp \ + zmq_init.hpp \ zmq_listener.hpp \ app_thread.cpp \ devpoll.cpp \ @@ -56,8 +63,11 @@ libzmq_la_SOURCES = \ ip.cpp \ kqueue.cpp \ object.cpp \ + options.cpp \ + owned.cpp \ poll.cpp \ select.cpp \ + session.cpp \ socket_base.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ @@ -67,7 +77,10 @@ libzmq_la_SOURCES = \ ypollset.cpp \ zmq.cpp \ zmq_connecter.cpp \ + zmq_decoder.cpp \ + zmq_encoder.cpp \ zmq_engine.cpp \ + zmq_init.cpp \ zmq_listener.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 diff --git a/src/atomic.hpp b/src/atomic.hpp deleted file mode 100644 index e581593..0000000 --- a/src/atomic.hpp +++ /dev/null @@ -1,310 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_ATOMIC_HPP_INCLUDED__ -#define __ZMQ_ATOMIC_HPP_INCLUDED__ - -#include "stdint.hpp" - -#if defined ZMQ_FORCE_MUTEXES -#define ZMQ_ATOMIC_MUTEX -#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZMQ_ATOMIC_X86 -#elif defined ZMQ_HAVE_WINDOWS -#define ZMQ_ATOMIC_WINDOWS -#elif defined ZMQ_HAVE_SOLARIS -#define ZMQ_ATOMIC_SOLARIS -#else -#define ZMQ_ATOMIC_MUTEX -#endif - -namespace zmq -{ - - // Atomic assignement. - inline void atomic_uint32_set (volatile uint32_t *p_, uint32_t value_) - { - *p_ = value_; - // StoreLoad memory barrier should go here on platforms with - // memory models that require it. - } - - // Atomic retrieval of an integer. - inline uint32_t atomic_uint32_get (volatile uint32_t *p_) - { - // StoreLoad memory barrier should go here on platforms with - // memory models that require it. - return *p_; - } - - // Atomic addition. Returns the old value. - inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_) - { -#if defined ZMQ_ATOMIC_WINDOWS - return InterlockedExchangeAdd ((LONG*) &value, increment_); -#elif defined ZMQ_ATOMIC_SOLARIS - return atomic_add_32_nv (&value, increment_) - delta_; -#elif defined ZMQ_ATOMIC_X86 - uint32_t old; - __asm__ volatile ( - "lock; xadd %0, %1\n\t" - : "=r" (old), "=m" (*p_) - : "0" (delta_), "m" (*p_) - : "cc", "memory"); - return old; -#else -#error // TODO: - sync.lock (); - uint32_t old = *p_; - *p_ += delta_; - sync.unlock (); -#endif - } - - // Atomic subtraction. Returns the old value. - inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_) - { -#if defined ZMQ_ATOMIC_WINDOWS - LONG delta = - ((LONG) delta_); - return InterlockedExchangeAdd ((LONG*) &value, delta); -#elif defined ZMQ_ATOMIC_SOLARIS - int32_t delta = - ((int32_t) delta_); - return atomic_add_32_nv (&value, delta) + delta_; -#elif defined ZMQ_ATOMIC_X86 - uint32_t old = -delta_; - __asm__ volatile ("lock; xaddl %0,%1" - : "=r" (old), "=m" (*p_) - : "0" (old), "m" (*p_) - : "cc"); - return old; -#else -#error // TODO: - sync.lock (); - uint32_t old = *p_; - *p_ -= delta_; - sync.unlock (); - return old; -#endif - } - - // Atomic assignement. - template - inline void atomic_ptr_set (volatile T **p_, T *value_) - { - *p_ = value_; - // StoreLoad memory barrier should go here on platforms with - // memory models that require it. - } - - // Perform atomic 'exchange pointers' operation. Old value is returned. - template - inline void *atomic_ptr_xchg (volatile T **p_, T *value_) - { -#if defined ZMQ_ATOMIC_WINDOWS - return InterlockedExchangePointer (p_, value_); -#elif defined ZMQ_ATOMIC_SOLARIS - return atomic_swap_ptr (p_, value_); -#elif defined ZMQ_ATOMIC_X86 - void *old; - __asm__ volatile ( - "lock; xchg %0, %2" - : "=r" (old), "=m" (*p_) - : "m" (*p_), "0" (value_)); - return old; -#else -#error // TODO: - sync.lock (); - void *old = *p_; - *p_ = value_; - sync.unlock (); - return old; -#endif - } - - // Perform atomic 'compare and swap' operation on the pointer. - // The pointer is compared to 'cmp' argument and if they are - // equal, its value is set to 'value'. Old value of the pointer - // is returned. - template - inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_) - { -#if defined ZMQ_ATOMIC_WINDOWS - return InterlockedCompareExchangePointer (p_, value_, cmp_); -#elif defined ZMQ_ATOMIC_SOLARIS - return atomic_cas_ptr (p_, cmp_, value_); -#elif defined ZMQ_ATOMIC_X86 - void *old; - __asm__ volatile ( - "lock; cmpxchg %2, %3" - : "=a" (old), "=m" (*p_) - : "r" (value_), "m" (*p_), "0" (cmp_) - : "cc"); - return old; -#else -#error // TODO: - sync.lock (); - void *old = *p_; - if (old == cmp_) - *p_ = value_; - sync.unlock (); - return old; -#endif - } - -#if defined ZMQ_ATOMIC_X86 && defined __x86_64__ - typedef uint64_t atomic_bitmap_t; -#else - typedef uint32_t atomic_bitmap_t; -#endif - - // Atomic assignement. - inline void atomic_bitmap_set (volatile atomic_bitmap_t *p_, - atomic_bitmap_t value_) - { - *p_ = value_; - // StoreLoad memory barrier should go here on platforms with - // memory models that require it. - } - - // Bit-test-set-and-reset. Sets one bit of the value and resets - // another one. Returns the original value of the reset bit. - inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_, - int set_index_, int reset_index_) - { -#if defined ZMQ_ATOMIC_WINDOWS - while (true) { - atomic_bitmap_t oldval = *p_; - atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << - set_index_)) & ~(integer_t (1) << reset_index_); - if (InterlockedCompareExchange ((volatile LONG*) p_, newval, - oldval) == (LONG) oldval) - return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? - true : false; - } -#elif defined ZMQ_ATOMIC_SOLARIS - while (true) { - atomic_bitmap_t oldval = *p_; - atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << - set_index_)) & ~(integer_t (1) << reset_index_); - if (atomic_cas_32 (p_, oldval, newval) == oldval) - return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? - true : false; - } -#elif defined ZMQ_ATOMIC_X86 - atomic_bitmap_t oldval, dummy; - __asm__ volatile ( - "mov %0, %1\n\t" - "1:" - "mov %1, %2\n\t" - "bts %3, %2\n\t" - "btr %4, %2\n\t" - "lock cmpxchg %2, %0\n\t" - "jnz 1b\n\t" - : "+m" (*p_), "=&a" (oldval), "=&r" (dummy) - : "r" (atomic_bitmap_t (set_index_)), - "r" (atomic_bitmap_t (reset_index_)) - : "cc"); - return (bool) (oldval & (atomic_bitmap_t (1) << reset_index_)); -#else -#error // TODO: - sync.lock (); - atomic_bitmap_t oldval = *p_; - *p_ = (oldval | (atomic_bitmap_t (1) << set_index_)) & - ~(atomic_bitmap_t (1) << reset_index_); - sync.unlock (); - return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false; -#endif - } - - // Sets value to newval. Returns the original value. - inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_, - atomic_bitmap_t newval_) - { -#if defined ZMQ_ATOMIC_WINDOWS - return InterlockedExchange ((volatile LONG*) p_, newval_); -#elif defined ZMQ_ATOMIC_SOLARIS - return atomic_swap_32 (p_, newval_); -#elif defined ZMQ_ATOMIC_X86 - atomic_bitmap_t oldval = newval_; - __asm__ volatile ( - "lock; xchg %0, %1" - : "=r" (oldval) - : "m" (*p_), "0" (oldval) - : "memory"); - return oldval; -#else -#error // TODO: - sync.lock (); - atomic_bitmap_t oldval = *p_; - *p_ = newval_; - sync.unlock (); -#endif - } - - // izte is "if-zero-then-else" atomic operation - if the value is zero - // it substitutes it by 'thenval' else it rewrites it by 'elseval'. - // Original value of the integer is returned from this function. - inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_, - atomic_bitmap_t thenval_, atomic_bitmap_t elseval_) - { -#if defined ZMQ_ATOMIC_WINDOWS - while (true) { - atomic_bitmap_t oldval = *p_; - atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); - if (InterlockedCompareExchange ((volatile LONG*) p_, newval, - oldval) == (LONG) oldval) - return oldval; - } -#elif defined ZMQ_ATOMIC_SOLARIS - while (true) { - atomic_bitmap_t oldval = *p_; - atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); - if (atomic_cas_32 (p_, oldval, newval) == oldval) - return oldval; - } -#elif defined ZMQ_ATOMIC_X86 - atomic_bitmap_t oldval; - atomic_bitmap_t dummy; - __asm__ volatile ( - "mov %0, %1\n\t" - "1:" - "mov %3, %2\n\t" - "test %1, %1\n\t" - "jz 2f\n\t" - "mov %4, %2\n\t" - "2:" - "lock cmpxchg %2, %0\n\t" - "jnz 1b\n\t" - : "+m" (*p_), "=&a" (oldval), "=&r" (dummy) - : "r" (thenval_), "r" (elseval_) - : "cc"); - return oldval; -#else -#error // TODO: - sync.lock (); - atomic_bitmap_t oldval = *p_; - *p_ = oldval ? elseval_ : thenval_; - sync.unlock (); - return oldval; -#endif - } - -} - -#endif diff --git a/src/i_inout.hpp b/src/i_inout.hpp new file mode 100644 index 0000000..be2e007 --- /dev/null +++ b/src/i_inout.hpp @@ -0,0 +1,37 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__ +#define __ZMQ_I_INOUT_HPP_INCLUDED__ + +#include "../include/zmq.h" + +namespace zmq +{ + + struct i_inout + { + virtual bool read (::zmq_msg *msg_) = 0; + virtual bool write (::zmq_msg *msg_) = 0; + virtual void flush () = 0; + }; + +} + +#endif diff --git a/src/io_object.cpp b/src/io_object.cpp index d8cc1c0..f61e5f0 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -21,35 +21,19 @@ #include "io_thread.hpp" #include "err.hpp" -zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) : - object_t (parent_), - owner (owner_), - plugged_in (false), - terminated (false) +zmq::io_object_t::io_object_t (io_thread_t *io_thread_) { // Retrieve the poller from the thread we are running in. - poller = parent_->get_poller (); + poller = io_thread_->get_poller (); } zmq::io_object_t::~io_object_t () { } -void zmq::io_object_t::process_plug () +void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_) { - zmq_assert (!plugged_in); - - // If termination of the object was already requested, destroy it and - // send the termination acknowledgement. - if (terminated) { - send_term_ack (owner); - delete this; - return; - } - - // Notify the generic termination mechanism (io_object_t) that the object - // is already plugged in. - plugged_in = true; + poller = io_thread_->get_poller (); } zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_) @@ -106,26 +90,3 @@ void zmq::io_object_t::timer_event () { zmq_assert (false); } - -void zmq::io_object_t::term () -{ - send_term_req (owner, this); -} - -void zmq::io_object_t::process_term () -{ - zmq_assert (!terminated); - - // If termination request has occured even before the object was plugged in - // wait till plugging in happens, then acknowledge the termination. - if (!plugged_in) { - terminated = true; - return; - } - - // Otherwise, destroy the object and acknowledge the termination - // straight away. - send_term_ack (owner); - process_unplug (); - delete this; -} diff --git a/src/io_object.hpp b/src/io_object.hpp index 4f323ad..e5582db 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -20,41 +20,31 @@ #ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__ #define __ZMQ_IO_OBJECT_HPP_INCLUDED__ -#include "object.hpp" +#include + #include "i_poller.hpp" #include "i_poll_events.hpp" namespace zmq { - class io_object_t : public object_t, public i_poll_events + // Simple base class for objects that live in I/O threads. + // It makes communication with the poller object easier and + // makes defining unneeded event handlers unnecessary. + + class io_object_t : public i_poll_events { public: - // I/O object will live in the thread inherited from the parent. - // However, it's lifetime is managed by the owner. - io_object_t (class io_thread_t *parent_, object_t *owner_); + io_object_t (class io_thread_t *io_thread_ = NULL); + ~io_object_t (); protected: - // Ask owner socket to terminate this I/O object. This may not happen - void term (); - - // I/O object destroys itself. No point in allowing others to invoke - // the destructor. At the same time, it has to be virtual so that - // generic io_object deallocation mechanism destroys specific type - // of I/O object correctly. - virtual ~io_object_t (); - - // Handlers for incoming commands. It vital that every I/O object - // invokes io_object_t::process_plug at the end of it's own plug - // handler. - void process_plug (); - - // io_object_t defines a new handler used to disconnect the object - // from the poller object. Implement the handlen in the derived - // classes to ensure sane cleanup. - virtual void process_unplug () = 0; + // Derived class can init/swap the underlying I/O thread. + // Caution: Remove all the file descriptors from the old I/O thread + // before swapping to the new one! + void set_io_thread (class io_thread_t *io_thread_); // Methods to access underlying poller object. handle_t add_fd (fd_t fd_); @@ -71,24 +61,10 @@ namespace zmq void out_event (); void timer_event (); - // Socket owning this I/O object. It is responsible for destroying - // it when it's being closed. - object_t *owner; - private: - // Set to true when object is plugged in. - bool plugged_in; - - // Set to true when object was terminated before it was plugged in. - // In such case destruction is delayed till 'plug' command arrives. - bool terminated; - struct i_poller *poller; - // Handlers for incoming commands. - void process_term (); - io_object_t (const io_object_t&); void operator = (const io_object_t&); }; diff --git a/src/options.cpp b/src/options.cpp new file mode 100644 index 0000000..cd07c44 --- /dev/null +++ b/src/options.cpp @@ -0,0 +1,29 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "options.hpp" + +zmq::options_t::options_t () : + hwm (0), + lwm (0), + swap (0), + mask (0), + affinity (0) +{ +} diff --git a/src/options.hpp b/src/options.hpp new file mode 100644 index 0000000..7d78da2 --- /dev/null +++ b/src/options.hpp @@ -0,0 +1,42 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__ +#define __ZMQ_OPTIONS_HPP_INCLUDED__ + +#include + +namespace zmq +{ + + struct options_t + { + options_t (); + + int64_t hwm; + int64_t lwm; + int64_t swap; + uint64_t mask; + uint64_t affinity; + std::string identity; + }; + +} + +#endif diff --git a/src/owned.cpp b/src/owned.cpp new file mode 100644 index 0000000..22e257f --- /dev/null +++ b/src/owned.cpp @@ -0,0 +1,74 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "owned.hpp" +#include "err.hpp" + +zmq::owned_t::owned_t (object_t *parent_, object_t *owner_) : + object_t (parent_), + owner (owner_), + plugged_in (false), + terminated (false) +{ +} + +zmq::owned_t::~owned_t () +{ +} + +void zmq::owned_t::process_plug () +{ + zmq_assert (!plugged_in); + + // If termination of the object was already requested, destroy it and + // send the termination acknowledgement. + if (terminated) { + send_term_ack (owner); + delete this; + return; + } + + // Notify the generic termination mechanism (io_object_t) that the object + // is already plugged in. + plugged_in = true; +} + +void zmq::owned_t::term () +{ + send_term_req (owner, this); +} + +void zmq::owned_t::process_term () +{ + zmq_assert (!terminated); + + // If termination request has occured even before the object was plugged in + // wait till plugging in happens, then acknowledge the termination. + if (!plugged_in) { + terminated = true; + return; + } + + // Otherwise, destroy the object and acknowledge the termination + // straight away. + send_term_ack (owner); + process_unplug (); + delete this; +} + diff --git a/src/owned.hpp b/src/owned.hpp new file mode 100644 index 0000000..164622e --- /dev/null +++ b/src/owned.hpp @@ -0,0 +1,82 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_OWNED_HPP_INCLUDED__ +#define __ZMQ_OWNED_HPP_INCLUDED__ + +#include "object.hpp" + +namespace zmq +{ + + // Base class for objects owned by individual sockets. Handles + // initialisation and destruction of such objects. + + class owned_t : public object_t + { + public: + + // The object will live in parent's thread, however, its lifetime + // will be managed by its owner socket. + owned_t (object_t *parent_, object_t *owner_); + + protected: + + // Ask owner socket to terminate this object. + void term (); + + // Derived object destroys owned_t. No point in allowing others to + // invoke the destructor. At the same time, it has to be virtual so + // that generic owned_t deallocation mechanism destroys specific type + // of the owned object correctly. + virtual ~owned_t (); + + // Handlers for incoming commands. It's vital that every I/O object + // invokes io_object_t::process_plug at the end of it's own plug + // handler. + void process_plug (); + + // io_object_t defines a new handler used to disconnect the object + // from the poller object. Implement the handlen in the derived + // classes to ensure sane cleanup. + virtual void process_unplug () = 0; + + // Socket owning this object. It is responsible for destroying + // it when it's being closed. + object_t *owner; + + private: + + // Handlers for incoming commands. + void process_term (); + + // Set to true when object is plugged in. + bool plugged_in; + + // Set to true when object was terminated before it was plugged in. + // In such case destruction is delayed till 'plug' command arrives. + bool terminated; + + owned_t (const owned_t&); + void operator = (const owned_t&); + }; + +} + +#endif diff --git a/src/session.cpp b/src/session.cpp new file mode 100644 index 0000000..fa29dd3 --- /dev/null +++ b/src/session.cpp @@ -0,0 +1,58 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "session.hpp" +#include "zmq_engine.hpp" +#include "err.hpp" + +zmq::session_t::session_t (object_t *parent_, object_t *owner_, + zmq_engine_t *engine_) : + owned_t (parent_, owner_), + engine (engine_) +{ +} + +zmq::session_t::~session_t () +{ +} + +bool zmq::session_t::read (::zmq_msg *msg_) +{ + return false; +} + +bool zmq::session_t::write (::zmq_msg *msg_) +{ + return false; +} + +void zmq::session_t::flush () +{ +} + +void zmq::session_t::process_plug () +{ + engine->plug (this); + owned_t::process_plug (); +} + +void zmq::session_t::process_unplug () +{ + engine->unplug (); +} diff --git a/src/session.hpp b/src/session.hpp new file mode 100644 index 0000000..4228fd9 --- /dev/null +++ b/src/session.hpp @@ -0,0 +1,57 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SESSION_HPP_INCLUDED__ +#define __ZMQ_SESSION_HPP_INCLUDED__ + +#include "i_inout.hpp" +#include "owned.hpp" + +namespace zmq +{ + + class session_t : public owned_t, public i_inout + { + public: + + session_t (object_t *parent_, object_t *owner_, + class zmq_engine_t *engine_); + + private: + + ~session_t (); + + // i_inout interface implementation. + bool read (::zmq_msg *msg_); + bool write (::zmq_msg *msg_); + void flush (); + + // Handlers for incoming commands. + void process_plug (); + void process_unplug (); + + class zmq_engine_t *engine; + + session_t (const session_t&); + void operator = (const session_t&); + }; + +} + +#endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c179a93..4fadad2 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -32,12 +32,7 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), pending_term_acks (0), - app_thread (parent_), - hwm (0), - lwm (0), - swap (0), - mask (0), - affinity (0) + app_thread (parent_) { } @@ -77,7 +72,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - hwm = *((int64_t*) optval_); + options.hwm = *((int64_t*) optval_); return 0; case ZMQ_LWM: @@ -85,7 +80,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - lwm = *((int64_t*) optval_); + options.lwm = *((int64_t*) optval_); return 0; case ZMQ_SWAP: @@ -93,7 +88,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - swap = *((int64_t*) optval_); + options.swap = *((int64_t*) optval_); return 0; case ZMQ_MASK: @@ -101,7 +96,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - mask = (uint64_t) *((int64_t*) optval_); + options.mask = (uint64_t) *((int64_t*) optval_); return 0; case ZMQ_AFFINITY: @@ -109,15 +104,15 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - affinity = (uint64_t) *((int64_t*) optval_); + options.affinity = (uint64_t) *((int64_t*) optval_); return 0; - case ZMQ_SESSIONID: + case ZMQ_IDENTITY: if (optvallen_ != sizeof (const char*)) { errno = EINVAL; return -1; } - session_id = (const char*) optval_; + options.identity = (const char*) optval_; return 0; default: @@ -128,8 +123,8 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, int zmq::socket_base_t::bind (const char *addr_) { - zmq_listener_t *listener = - new zmq_listener_t (choose_io_thread (affinity), this); + zmq_listener_t *listener = new zmq_listener_t ( + choose_io_thread (options.affinity), this, options); int rc = listener->set_address (addr_); if (rc != 0) return -1; @@ -141,8 +136,8 @@ int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) { - zmq_connecter_t *connecter = - new zmq_connecter_t (choose_io_thread (affinity), this); + zmq_connecter_t *connecter = new zmq_connecter_t ( + choose_io_thread (options.affinity), this, options); int rc = connecter->set_address (addr_); if (rc != 0) return -1; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 2257fbe..4ccac48 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -24,6 +24,7 @@ #include #include "object.hpp" +#include "options.hpp" #include "stdint.hpp" namespace zmq @@ -66,12 +67,7 @@ namespace zmq class app_thread_t *app_thread; // Socket options. - int64_t hwm; - int64_t lwm; - int64_t swap; - uint64_t mask; - uint64_t affinity; - std::string session_id; + options_t options; socket_base_t (const socket_base_t&); void operator = (const socket_base_t&); diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 513508d..4416a70 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -18,11 +18,16 @@ */ #include "zmq_connecter.hpp" +#include "zmq_init.hpp" +#include "io_thread.hpp" #include "err.hpp" -zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_) : - io_object_t (parent_, owner_), - waiting (false) +zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_, + const options_t &options_) : + owned_t (parent_, owner_), + io_object_t (parent_), + handle_valid (false), + options (options_) { } @@ -38,12 +43,12 @@ int zmq::zmq_connecter_t::set_address (const char *addr_) void zmq::zmq_connecter_t::process_plug () { start_connecting (); - io_object_t::process_plug (); + owned_t::process_plug (); } void zmq::zmq_connecter_t::process_unplug () { - if (!waiting) + if (handle_valid) rm_fd (handle); } @@ -58,30 +63,31 @@ void zmq::zmq_connecter_t::in_event () void zmq::zmq_connecter_t::out_event () { fd_t fd = tcp_connecter.connect (); + rm_fd (handle); + handle_valid = false; // If there was error during the connecting, close the socket and wait // for a while before trying to reconnect. if (fd == retired_fd) { - rm_fd (handle); tcp_connecter.close (); - waiting = true; add_timer (); return; } - zmq_assert (false); + // Create an init object. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, true, options); + zmq_assert (init); + send_plug (init); + send_own (owner, init); -/* - object_t *engine = new zmq_engine_t (choose_io_thread (0), owner); - send_plug (engine); - send_own (owner, engine); -*/ + // Ask owner socket to shut the connecter down. + term (); } void zmq::zmq_connecter_t::timer_event () { // Reconnect period have elapsed. - waiting = false; start_connecting (); } @@ -99,12 +105,12 @@ void zmq::zmq_connecter_t::start_connecting () // Connection establishment may be dealyed. Poll for its completion. else if (rc == -1 && errno == EINPROGRESS) { handle = add_fd (tcp_connecter.get_fd ()); + handle_valid = true; set_pollout (handle); return; } // If none of the above is true, synchronous error occured. // Wait for a while and retry. - waiting = true; add_timer (); } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index d346396..93497cb 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -20,17 +20,21 @@ #ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ #define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ +#include "owned.hpp" #include "io_object.hpp" #include "tcp_connecter.hpp" +#include "options.hpp" +#include "stdint.hpp" namespace zmq { - class zmq_connecter_t : public io_object_t + class zmq_connecter_t : public owned_t, public io_object_t { public: - zmq_connecter_t (class io_thread_t *parent_, object_t *owner_); + zmq_connecter_t (class io_thread_t *parent_, object_t *owner_, + const options_t &options_); // Set IP address to connect to. int set_address (const char *addr_); @@ -57,9 +61,12 @@ namespace zmq // Handle corresponding to the listening socket. handle_t handle; - // True, if we are waiting for a period of time before trying to - // reconnect. - bool waiting; + // If true file descriptor is registered with the poller and 'handle' + // contains valid value. + bool handle_valid; + + // Associated socket options. + options_t options; zmq_connecter_t (const zmq_connecter_t&); void operator = (const zmq_connecter_t&); diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp new file mode 100644 index 0000000..b461234 --- /dev/null +++ b/src/zmq_decoder.cpp @@ -0,0 +1,78 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "zmq_decoder.hpp" +#include "i_inout.hpp" +#include "wire.hpp" + +zmq::zmq_decoder_t::zmq_decoder_t () : + destination (NULL) +{ + zmq_msg_init (&in_progress); + + // At the beginning, read one byte and go to one_byte_size_ready state. + next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); +} + +zmq::zmq_decoder_t::~zmq_decoder_t () +{ + zmq_msg_close (&in_progress); +} + +void zmq::zmq_decoder_t::set_inout (i_inout *destination_) +{ + destination = destination_; +} + +bool zmq::zmq_decoder_t::one_byte_size_ready () +{ + // First byte of size is read. If it is 0xff read 8-byte size. + // Otherwise allocate the buffer for message data and read the + // message data into it. + if (*tmpbuf == 0xff) + next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready); + else { + zmq_msg_init_size (&in_progress, *tmpbuf); + next_step (zmq_msg_data (&in_progress), *tmpbuf, + &zmq_decoder_t::message_ready); + } + return true; +} + +bool zmq::zmq_decoder_t::eight_byte_size_ready () +{ + // 8-byte size is read. Allocate the buffer for message body and + // read the message data into it. + size_t size = (size_t) get_uint64 (tmpbuf); + zmq_msg_init_size (&in_progress, size); + next_step (zmq_msg_data (&in_progress), size, + &zmq_decoder_t::message_ready); + return true; +} + +bool zmq::zmq_decoder_t::message_ready () +{ + // Message is completely read. Push it further and start reading + // new message. + if (!destination->write (&in_progress)) + return false; + + next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); + return true; +} diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp new file mode 100644 index 0000000..17c28f8 --- /dev/null +++ b/src/zmq_decoder.hpp @@ -0,0 +1,57 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ +#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ + +#include "../include/zmq.h" + +#include "decoder.hpp" + +namespace zmq +{ + // Decoder for 0MQ backend protocol. Converts data batches into messages. + + class zmq_decoder_t : public decoder_t + { + public: + + zmq_decoder_t (); + ~zmq_decoder_t (); + + void set_inout (struct i_inout *destination_); + + private: + + bool one_byte_size_ready (); + bool eight_byte_size_ready (); + bool message_ready (); + + struct i_inout *destination; + unsigned char tmpbuf [8]; + ::zmq_msg in_progress; + + zmq_decoder_t (const zmq_decoder_t&); + void operator = (const zmq_decoder_t&); + }; + +} + +#endif + diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp new file mode 100644 index 0000000..124d77b --- /dev/null +++ b/src/zmq_encoder.cpp @@ -0,0 +1,76 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "zmq_encoder.hpp" +#include "i_inout.hpp" +#include "wire.hpp" + +zmq::zmq_encoder_t::zmq_encoder_t () : + source (NULL) +{ + zmq_msg_init (&in_progress); + + // Write 0 bytes to the batch and go to message_ready state. + next_step (NULL, 0, &zmq_encoder_t::message_ready, true); +} + +zmq::zmq_encoder_t::~zmq_encoder_t () +{ + zmq_msg_close (&in_progress); +} + +void zmq::zmq_encoder_t::set_inout (i_inout *source_) +{ + source = source_; +} + +bool zmq::zmq_encoder_t::size_ready () +{ + // Write message body into the buffer. + next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + &zmq_encoder_t::message_ready, false); + return true; +} + +bool zmq::zmq_encoder_t::message_ready () +{ + // Read new message from the dispatcher. If there is none, return false. + // Note that new state is set only if write is successful. That way + // unsuccessful write will cause retry on the next state machine + // invocation. + if (!source->read (&in_progress)) { + return false; + } + size_t size = zmq_msg_size (&in_progress); + + // For messages less than 255 bytes long, write one byte of message size. + // For longer messages write 0xff escape character followed by 8-byte + // message size. + if (size < 255) { + tmpbuf [0] = (unsigned char) size; + next_step (tmpbuf, 1, &zmq_encoder_t::size_ready, true); + } + else { + tmpbuf [0] = 0xff; + put_uint64 (tmpbuf + 1, size); + next_step (tmpbuf, 9, &zmq_encoder_t::size_ready, true); + } + return true; +} + diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp new file mode 100644 index 0000000..89af265 --- /dev/null +++ b/src/zmq_encoder.hpp @@ -0,0 +1,55 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ +#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ + +#include "../include/zmq.h" + +#include "encoder.hpp" + +namespace zmq +{ + // Encoder for 0MQ backend protocol. Converts messages into data batches. + + class zmq_encoder_t : public encoder_t + { + public: + + zmq_encoder_t (); + ~zmq_encoder_t (); + + void set_inout (struct i_inout *source_); + + private: + + bool size_ready (); + bool message_ready (); + + struct i_inout *source; + ::zmq_msg in_progress; + unsigned char tmpbuf [9]; + + zmq_encoder_t (const zmq_encoder_t&); + void operator = (const zmq_encoder_t&); + }; +} + +#endif + diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 3708c9a..3620d30 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -19,17 +19,118 @@ #include "zmq_engine.hpp" #include "io_thread.hpp" +#include "i_inout.hpp" +#include "config.hpp" +#include "err.hpp" -zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, object_t *owner_) : - io_object_t (parent_, owner_) +zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : + io_object_t (parent_), + insize (0), + inpos (0), + outsize (0), + outpos (0), + inout (NULL) { + // Allocate read & write buffer. + inbuf = (unsigned char*) malloc (in_batch_size); + zmq_assert (inbuf); + outbuf = (unsigned char*) malloc (out_batch_size); + zmq_assert (outbuf); + + // Initialise the underlying socket. + int rc = tcp_socket.open (fd_); + zmq_assert (rc == 0); } zmq::zmq_engine_t::~zmq_engine_t () { + free (outbuf); + free (inbuf); +} + +void zmq::zmq_engine_t::plug (i_inout *inout_) +{ + encoder.set_inout (inout_); + decoder.set_inout (inout_); + + handle = add_fd (tcp_socket.get_fd ()); + set_pollin (handle); + set_pollout (handle); + + inout = inout_; +} + +void zmq::zmq_engine_t::unplug () +{ + rm_fd (handle); + inout = NULL; +} + +void zmq::zmq_engine_t::in_event () +{ + // If there's no data to process in the buffer, read new data. + if (inpos == insize) { + + // Read as much data as possible to the read buffer. + insize = tcp_socket.read (inbuf, in_batch_size); +printf ("%d bytes read\n", (int) insize); + inpos = 0; + + // Check whether the peer has closed the connection. + if (insize == -1) { + insize = 0; + error (); + return; + } + } + + // Following code should be executed even if there's not a single byte in + // the buffer. There still can be a decoded messages stored in the decoder. + + // Push the data to the decoder. + int nbytes = decoder.write (inbuf + inpos, insize - inpos); + + // Adjust read position. Stop polling for input if we got stuck. + inpos += nbytes; + if (inpos < insize) + reset_pollin (handle); + + // If at least one byte was processed, flush all messages the decoder + // may have produced. + if (nbytes > 0) + inout->flush (); + } -void zmq::zmq_engine_t::process_plug () +void zmq::zmq_engine_t::out_event () { + // If write buffer is empty, try to read new data from the encoder. + if (outpos == outsize) { + + outsize = encoder.read (outbuf, out_batch_size); + outpos = 0; + + // If there is no data to send, stop polling for output. + if (outsize == 0) + reset_pollout (handle); + } + + // If there are any data to write in write buffer, write as much as + // possible to the socket. + if (outpos < outsize) { + int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos); + + // Handle problems with the connection. + if (nbytes == -1) { + error (); + return; + } + + outpos += nbytes; + } } +void zmq::zmq_engine_t::error () +{ + zmq_assert (false); +} diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index ad739c7..38a390d 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -21,6 +21,9 @@ #define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ #include "io_object.hpp" +#include "tcp_socket.hpp" +#include "zmq_encoder.hpp" +#include "zmq_decoder.hpp" namespace zmq { @@ -29,14 +32,36 @@ namespace zmq { public: - zmq_engine_t (class io_thread_t *parent_, object_t *owner_); + zmq_engine_t (class io_thread_t *parent_, fd_t fd_); + ~zmq_engine_t (); + + void plug (struct i_inout *inout_); + void unplug (); + + // i_poll_events interface implementation. + void in_event (); + void out_event (); private: - ~zmq_engine_t (); + // Function to handle network disconnections. + void error (); + + tcp_socket_t tcp_socket; + handle_t handle; + + unsigned char *inbuf; + int insize; + int inpos; + + unsigned char *outbuf; + int outsize; + int outpos; + + i_inout *inout; - // Handlers for incoming commands. - void process_plug (); + zmq_encoder_t encoder; + zmq_decoder_t decoder; zmq_engine_t (const zmq_engine_t&); void operator = (const zmq_engine_t&); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp new file mode 100644 index 0000000..fea1452 --- /dev/null +++ b/src/zmq_init.cpp @@ -0,0 +1,110 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "zmq_init.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "err.hpp" + +zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, object_t *owner_, fd_t fd_, + bool connected_, const options_t &options_) : + owned_t (parent_, owner_), + connected (connected_), + options (options_) +{ + // Create associated engine object. + engine = new zmq_engine_t (parent_, fd_); + zmq_assert (engine); +} + +zmq::zmq_init_t::~zmq_init_t () +{ + if (engine) + delete engine; +} + +bool zmq::zmq_init_t::read (::zmq_msg *msg_) +{ + // On the listening side, no initialisation data are sent to the peer. + if (!connected) + return false; + + // Send identity. + int rc = zmq_msg_init_size (msg_, options.identity.size ()); + zmq_assert (rc == 0); + memcpy (zmq_msg_data (msg_), options.identity.c_str (), + options.identity.size ()); + + // Initialisation is done. + create_session (); + + return true; +} + +bool zmq::zmq_init_t::write (::zmq_msg *msg_) +{ + // On the connecting side no initialisation data are expected. + if (connected) + return false; + + // Retreieve the identity. + options.identity = std::string ((const char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); + + // Initialisation is done. + create_session (); + + return true; +} + +void zmq::zmq_init_t::flush () +{ + // No need to do anything. zmq_init_t does no batching of messages. + // Each message is processed immediately on write. +} + +void zmq::zmq_init_t::process_plug () +{ + engine->plug (this); + owned_t::process_plug (); +} + +void zmq::zmq_init_t::process_unplug () +{ + engine->unplug (); +} + +void zmq::zmq_init_t::create_session () +{ + // Disconnect engine from the init object. + engine->unplug (); + + // Create the session instance. + io_thread_t *io_thread = choose_io_thread (options.affinity); + session_t *session = new session_t (io_thread, owner, engine); + zmq_assert (session); + engine = NULL; + + // Pass session/engine pair to a chosen I/O thread. + send_plug (session); + send_own (owner, session); + + // Destroy the init object. + term (); +} diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp new file mode 100644 index 0000000..2e0910a --- /dev/null +++ b/src/zmq_init.hpp @@ -0,0 +1,82 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ +#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ + +#include + +#include "i_inout.hpp" +#include "owned.hpp" +#include "zmq_engine.hpp" +#include "stdint.hpp" +#include "fd.hpp" +#include "options.hpp" + +namespace zmq +{ + + // The class handles initialisation phase of native 0MQ wire-level + // protocol. Currently it can be used to handle both sides of the + // connection. If it grows to complex, we can separate the two into + // distinct classes. + + class zmq_init_t : public owned_t, public i_inout + { + public: + + // Set 'connected' to true if the connection was created by 'connect' + // function. If it was accepted from a listening socket, set it to + // false. + zmq_init_t (class io_thread_t *parent_, object_t *owner_, fd_t fd_, + bool connected_, const options_t &options); + ~zmq_init_t (); + + private: + + // i_inout interface implementation. + bool read (::zmq_msg *msg_); + bool write (::zmq_msg *msg_); + void flush (); + + // Handlers for incoming commands. + void process_plug (); + void process_unplug (); + + void create_session (); + + // Engine is created by zmq_init_t object. Once the initialisation + // phase is over it is passed to a session object, possibly running + // in a different I/O thread. + zmq_engine_t *engine; + + // If true, we are on the connecting side. If false, we are on the + // listening side. + bool connected; + + // Associated socket options. + options_t options; + + zmq_init_t (const zmq_init_t&); + void operator = (const zmq_init_t&); + }; + +} + +#endif diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 463a1dd..c990468 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -18,12 +18,15 @@ */ #include "zmq_listener.hpp" -#include "zmq_engine.hpp" +#include "zmq_init.hpp" #include "io_thread.hpp" #include "err.hpp" -zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) : - io_object_t (parent_, owner_) +zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_, + const options_t &options_) : + owned_t (parent_, owner_), + io_object_t (parent_), + options (options_) { } @@ -46,7 +49,7 @@ void zmq::zmq_listener_t::process_plug () handle = add_fd (tcp_listener.get_fd ()); set_pollin (handle); - io_object_t::process_plug (); + owned_t::process_plug (); } void zmq::zmq_listener_t::process_unplug () @@ -63,14 +66,12 @@ void zmq::zmq_listener_t::in_event () if (fd == retired_fd) return; - // TODO - zmq_assert (false); - -/* - object_t *engine = new zmq_engine_t (choose_io_thread (0), owner); - send_plug (engine); - send_own (owner, engine); -*/ + // Create an init object. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, false, options); + zmq_assert (init); + send_plug (init); + send_own (owner, init); } diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp index ef252b3..f85ad5a 100644 --- a/src/zmq_listener.hpp +++ b/src/zmq_listener.hpp @@ -20,17 +20,21 @@ #ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ +#include "owned.hpp" #include "io_object.hpp" #include "tcp_listener.hpp" +#include "options.hpp" +#include "stdint.hpp" namespace zmq { - class zmq_listener_t : public io_object_t + class zmq_listener_t : public owned_t, public io_object_t { public: - zmq_listener_t (class io_thread_t *parent_, object_t *owner_); + zmq_listener_t (class io_thread_t *parent_, object_t *owner_, + const options_t &options_); // Set IP address to listen on. int set_address (const char *addr_); @@ -52,6 +56,9 @@ namespace zmq // Handle corresponding to the listening socket. handle_t handle; + // Associated socket options. + options_t options; + zmq_listener_t (const zmq_listener_t&); void operator = (const zmq_listener_t&); }; -- cgit v1.2.3