diff options
| -rw-r--r-- | src/io_object.cpp | 48 | ||||
| -rw-r--r-- | src/io_object.hpp | 21 | ||||
| -rw-r--r-- | src/socket_base.cpp | 6 | ||||
| -rw-r--r-- | src/tcp_listener.cpp | 19 | ||||
| -rw-r--r-- | src/tcp_listener.hpp | 11 | ||||
| -rw-r--r-- | src/zmq_listener.cpp | 36 | ||||
| -rw-r--r-- | src/zmq_listener.hpp | 15 | 
7 files changed, 142 insertions, 14 deletions
| diff --git a/src/io_object.cpp b/src/io_object.cpp index a4badd7..b7c70d4 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -19,10 +19,13 @@  #include "io_object.hpp"  #include "io_thread.hpp" +#include "err.hpp"  zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) :      object_t (parent_), -    owner (owner_) +    owner (owner_), +    plugged_in (false), +    terminated (false)  {      //  Retrieve the poller from the thread we are running in.      poller = parent_->get_poller (); @@ -32,6 +35,23 @@ zmq::io_object_t::~io_object_t ()  {  } +void zmq::io_object_t::process_plug () +{ +    zmq_assert (!plugged_in); + +    //  If termination of the object was already requested, destroy it and +    //  send the termination acknowledgement. +    if (terminated) { +        send_term_ack (owner); +        delete this; +        return; +    } + +    //  Notify the generic termination mechanism (io_object_t) that the object +    //  is already plugged in. +    plugged_in = true; +} +  zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_, i_poll_events *events_)  {      return poller->add_fd (fd_, events_); @@ -72,6 +92,21 @@ void zmq::io_object_t::cancel_timer (i_poll_events *events_)      poller->cancel_timer (events_);  } +void zmq::io_object_t::in_event () +{ +    zmq_assert (false); +} + +void zmq::io_object_t::out_event () +{ +    zmq_assert (false); +} + +void zmq::io_object_t::timer_event () +{ +    zmq_assert (false); +} +  void zmq::io_object_t::term ()  {      send_term_req (owner, this); @@ -79,6 +114,17 @@ void zmq::io_object_t::term ()  void zmq::io_object_t::process_term ()  { +    zmq_assert (!terminated); + +    //  If termination request has occured even before the object was plugged in +    //  wait till plugging in happens, then acknowledge the termination. +    if (!plugged_in) { +        terminated = true; +        return; +    } + +    //  Otherwise, destroy the object and acknowledge the termination +    //  straight away.      send_term_ack (owner);      delete this;  } diff --git a/src/io_object.hpp b/src/io_object.hpp index ddb4414..2029bfb 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -22,11 +22,12 @@  #include "object.hpp"  #include "i_poller.hpp" +#include "i_poll_events.hpp"  namespace zmq  { -    class io_object_t : public object_t +    class io_object_t : public object_t, public i_poll_events      {      public: @@ -45,6 +46,11 @@ namespace zmq          //  of I/O object correctly.          virtual ~io_object_t (); +        //  Handlers for incoming commands. It vital that every I/O object +        //  invokes io_object_t::process_plug at the end of it's own plug +        //  handler. +        void process_plug (); +          //  Methods to access underlying poller object.          handle_t add_fd (fd_t fd_, struct i_poll_events *events_);          void rm_fd (handle_t handle_); @@ -55,12 +61,25 @@ namespace zmq          void add_timer (struct i_poll_events *events_);          void cancel_timer (struct i_poll_events *events_); +        //  i_poll_events interface implementation. +        void in_event (); +        void out_event (); +        void timer_event (); +          //  Socket owning this I/O object. It is responsible for destroying          //  it when it's being closed.          object_t *owner; +        //  Set to true when object is plugged in. It's responsibility +        //  of derived object to set the property after the feat. +        bool plugged_in; +      private: +        //  Set to true when object was terminated before it was plugged in. +        //  In such case destruction is delayed till 'plug' command arrives. +        bool terminated; +          struct i_poller *poller;          //  Handlers for incoming commands. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 026b317..3141517 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -70,7 +70,11 @@ int zmq::socket_base_t::bind (const char *addr_)  {      //  TODO: The taskset should be taken from socket options.      uint64_t taskset = 0; -    object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this); +    zmq_listener_t *listener = new zmq_listener_t (choose_io_thread (taskset), this); +    int rc = listener->set_address (addr_); +    if (rc != 0) +        return -1; +      send_plug (listener);      send_own (this, listener);      return 0; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 6aae88a..937618d 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -17,6 +17,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include <string.h> +  #include "tcp_listener.hpp"  #include "platform.hpp"  #include "ip.hpp" @@ -41,6 +43,7 @@  zmq::tcp_listener_t::tcp_listener_t () :      s (retired_fd)  { +    memset (&addr, 0, sizeof (addr));  }  zmq::tcp_listener_t::~tcp_listener_t () @@ -49,14 +52,14 @@ zmq::tcp_listener_t::~tcp_listener_t ()          close ();  } -int zmq::tcp_listener_t::open (const char *addr_) +int zmq::tcp_listener_t::set_address (const char *addr_)  {      //  Convert the interface into sockaddr_in structure. -    sockaddr_in ip_address; -    int rc = resolve_ip_interface (&ip_address, addr_); -    if (rc != 0) -        return -1; -     +    return resolve_ip_interface (&addr, addr_); +} + +int zmq::tcp_listener_t::open () +{      //  Create a listening socket.      s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);      if (s == -1) @@ -64,7 +67,7 @@ int zmq::tcp_listener_t::open (const char *addr_)      //  Allow reusing of the address.      int flag = 1; -    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); +    int rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));      errno_assert (rc == 0);      //  Set the non-blocking flag. @@ -75,7 +78,7 @@ int zmq::tcp_listener_t::open (const char *addr_)      errno_assert (rc != -1);      //  Bind the socket to the network interface and port. -    rc = bind (s, (struct sockaddr*) &ip_address, sizeof (ip_address)); +    rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));      if (rc != 0) {          close ();          return -1; diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 43a4aa8..2a8bc23 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -21,6 +21,7 @@  #define __ZMQ_TCP_LISTENER_HPP_INCLUDED__  #include "fd.hpp" +#include "ip.hpp"  namespace zmq  { @@ -34,10 +35,13 @@ namespace zmq          tcp_listener_t ();          ~tcp_listener_t (); -        //  Open TCP listining socket. Address is in +        //  Set up the address to listen on. Address is in          //  <interface-name>:<port-number> format. Interface name may be '*'          //  to bind to all the interfaces. -        int open (const char *addr_); +        int set_address (const char *addr_); + +        //  Open TCP listining socket.  +        int open ();          //  Close the listening socket.          int close (); @@ -53,6 +57,9 @@ namespace zmq      private: +        //  IP address/port to listen on. +        sockaddr_in addr; +          //  Underlying socket.          fd_t s; diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 9787f7e..9c9bbe9 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -29,12 +29,46 @@ zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) :  zmq::zmq_listener_t::~zmq_listener_t ()  { +    if (plugged_in) +        rm_fd (handle); +} + +int zmq::zmq_listener_t::set_address (const char *addr_) +{ +     return tcp_listener.set_address (addr_);  }  void zmq::zmq_listener_t::process_plug ()  { -    //  TODO: Testing code follows... +    //  Open the listening socket. +    int rc = tcp_listener.open (); +    zmq_assert (rc == 0); + +    //  Start polling for incoming connections. +    handle = add_fd (tcp_listener.get_fd (), this); +    set_pollin (handle); + +    io_object_t::process_plug (); +} + +void zmq::zmq_listener_t::in_event () +{ +    fd_t fd = tcp_listener.accept (); + +    //  If connection was reset by the peer in the meantime, just ignore it. +    //  TODO: Handle specific errors like ENFILE/EMFILE etc. +    if (fd == retired_fd) +        return; + +    //  TODO +    zmq_assert (false); + +/*      object_t *engine = new zmq_engine_t (choose_io_thread (0), owner);      send_plug (engine);      send_own (owner, engine); +*/  } + + + diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp index ea7cb92..74d42e1 100644 --- a/src/zmq_listener.hpp +++ b/src/zmq_listener.hpp @@ -20,7 +20,10 @@  #ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__  #define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ +#include <string> +  #include "io_object.hpp" +#include "tcp_listener.hpp"  namespace zmq  { @@ -31,6 +34,9 @@ namespace zmq          zmq_listener_t (class io_thread_t *parent_, object_t *owner_); +        //  Set IP address to listen on. +        int set_address (const char *addr_); +      private:          ~zmq_listener_t (); @@ -38,6 +44,15 @@ namespace zmq          //  Handlers for incoming commands.          void process_plug (); +        //  Handle I/O events. +        void in_event (); + +        //  Actual listening socket. +        tcp_listener_t tcp_listener; + +        //  Handle corresponding to the listening socket. +        handle_t handle; +          zmq_listener_t (const zmq_listener_t&);          void operator = (const zmq_listener_t&);      }; | 
