From bde396f1561fb5e57e6e413a40d904586e186d42 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 9 Aug 2009 11:21:47 +0200 Subject: fix to 3-thread synchronisation algorithm --- src/io_object.cpp | 48 +++++++++++++++++++++++++++++++++++++++++++++++- src/io_object.hpp | 21 ++++++++++++++++++++- src/socket_base.cpp | 6 +++++- src/tcp_listener.cpp | 19 +++++++++++-------- src/tcp_listener.hpp | 11 +++++++++-- src/zmq_listener.cpp | 36 +++++++++++++++++++++++++++++++++++- src/zmq_listener.hpp | 15 +++++++++++++++ 7 files changed, 142 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/io_object.cpp b/src/io_object.cpp index a4badd7..b7c70d4 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -19,10 +19,13 @@ #include "io_object.hpp" #include "io_thread.hpp" +#include "err.hpp" zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) : object_t (parent_), - owner (owner_) + owner (owner_), + plugged_in (false), + terminated (false) { // Retrieve the poller from the thread we are running in. poller = parent_->get_poller (); @@ -32,6 +35,23 @@ zmq::io_object_t::~io_object_t () { } +void zmq::io_object_t::process_plug () +{ + zmq_assert (!plugged_in); + + // If termination of the object was already requested, destroy it and + // send the termination acknowledgement. + if (terminated) { + send_term_ack (owner); + delete this; + return; + } + + // Notify the generic termination mechanism (io_object_t) that the object + // is already plugged in. + plugged_in = true; +} + zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_, i_poll_events *events_) { return poller->add_fd (fd_, events_); @@ -72,6 +92,21 @@ void zmq::io_object_t::cancel_timer (i_poll_events *events_) poller->cancel_timer (events_); } +void zmq::io_object_t::in_event () +{ + zmq_assert (false); +} + +void zmq::io_object_t::out_event () +{ + zmq_assert (false); +} + +void zmq::io_object_t::timer_event () +{ + zmq_assert (false); +} + void zmq::io_object_t::term () { send_term_req (owner, this); @@ -79,6 +114,17 @@ void zmq::io_object_t::term () void zmq::io_object_t::process_term () { + zmq_assert (!terminated); + + // If termination request has occured even before the object was plugged in + // wait till plugging in happens, then acknowledge the termination. + if (!plugged_in) { + terminated = true; + return; + } + + // Otherwise, destroy the object and acknowledge the termination + // straight away. send_term_ack (owner); delete this; } diff --git a/src/io_object.hpp b/src/io_object.hpp index ddb4414..2029bfb 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -22,11 +22,12 @@ #include "object.hpp" #include "i_poller.hpp" +#include "i_poll_events.hpp" namespace zmq { - class io_object_t : public object_t + class io_object_t : public object_t, public i_poll_events { public: @@ -45,6 +46,11 @@ namespace zmq // of I/O object correctly. virtual ~io_object_t (); + // Handlers for incoming commands. It vital that every I/O object + // invokes io_object_t::process_plug at the end of it's own plug + // handler. + void process_plug (); + // Methods to access underlying poller object. handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); @@ -55,12 +61,25 @@ namespace zmq void add_timer (struct i_poll_events *events_); void cancel_timer (struct i_poll_events *events_); + // i_poll_events interface implementation. + void in_event (); + void out_event (); + void timer_event (); + // Socket owning this I/O object. It is responsible for destroying // it when it's being closed. object_t *owner; + // Set to true when object is plugged in. It's responsibility + // of derived object to set the property after the feat. + bool plugged_in; + private: + // Set to true when object was terminated before it was plugged in. + // In such case destruction is delayed till 'plug' command arrives. + bool terminated; + struct i_poller *poller; // Handlers for incoming commands. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 026b317..3141517 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -70,7 +70,11 @@ int zmq::socket_base_t::bind (const char *addr_) { // TODO: The taskset should be taken from socket options. uint64_t taskset = 0; - object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this); + zmq_listener_t *listener = new zmq_listener_t (choose_io_thread (taskset), this); + int rc = listener->set_address (addr_); + if (rc != 0) + return -1; + send_plug (listener); send_own (this, listener); return 0; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 6aae88a..937618d 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include "tcp_listener.hpp" #include "platform.hpp" #include "ip.hpp" @@ -41,6 +43,7 @@ zmq::tcp_listener_t::tcp_listener_t () : s (retired_fd) { + memset (&addr, 0, sizeof (addr)); } zmq::tcp_listener_t::~tcp_listener_t () @@ -49,14 +52,14 @@ zmq::tcp_listener_t::~tcp_listener_t () close (); } -int zmq::tcp_listener_t::open (const char *addr_) +int zmq::tcp_listener_t::set_address (const char *addr_) { // Convert the interface into sockaddr_in structure. - sockaddr_in ip_address; - int rc = resolve_ip_interface (&ip_address, addr_); - if (rc != 0) - return -1; - + return resolve_ip_interface (&addr, addr_); +} + +int zmq::tcp_listener_t::open () +{ // Create a listening socket. s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); if (s == -1) @@ -64,7 +67,7 @@ int zmq::tcp_listener_t::open (const char *addr_) // Allow reusing of the address. int flag = 1; - rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); + int rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); errno_assert (rc == 0); // Set the non-blocking flag. @@ -75,7 +78,7 @@ int zmq::tcp_listener_t::open (const char *addr_) errno_assert (rc != -1); // Bind the socket to the network interface and port. - rc = bind (s, (struct sockaddr*) &ip_address, sizeof (ip_address)); + rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); if (rc != 0) { close (); return -1; diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 43a4aa8..2a8bc23 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -21,6 +21,7 @@ #define __ZMQ_TCP_LISTENER_HPP_INCLUDED__ #include "fd.hpp" +#include "ip.hpp" namespace zmq { @@ -34,10 +35,13 @@ namespace zmq tcp_listener_t (); ~tcp_listener_t (); - // Open TCP listining socket. Address is in + // Set up the address to listen on. Address is in // : format. Interface name may be '*' // to bind to all the interfaces. - int open (const char *addr_); + int set_address (const char *addr_); + + // Open TCP listining socket. + int open (); // Close the listening socket. int close (); @@ -53,6 +57,9 @@ namespace zmq private: + // IP address/port to listen on. + sockaddr_in addr; + // Underlying socket. fd_t s; diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 9787f7e..9c9bbe9 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -29,12 +29,46 @@ zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) : zmq::zmq_listener_t::~zmq_listener_t () { + if (plugged_in) + rm_fd (handle); +} + +int zmq::zmq_listener_t::set_address (const char *addr_) +{ + return tcp_listener.set_address (addr_); } void zmq::zmq_listener_t::process_plug () { - // TODO: Testing code follows... + // Open the listening socket. + int rc = tcp_listener.open (); + zmq_assert (rc == 0); + + // Start polling for incoming connections. + handle = add_fd (tcp_listener.get_fd (), this); + set_pollin (handle); + + io_object_t::process_plug (); +} + +void zmq::zmq_listener_t::in_event () +{ + fd_t fd = tcp_listener.accept (); + + // If connection was reset by the peer in the meantime, just ignore it. + // TODO: Handle specific errors like ENFILE/EMFILE etc. + if (fd == retired_fd) + return; + + // TODO + zmq_assert (false); + +/* object_t *engine = new zmq_engine_t (choose_io_thread (0), owner); send_plug (engine); send_own (owner, engine); +*/ } + + + diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp index ea7cb92..74d42e1 100644 --- a/src/zmq_listener.hpp +++ b/src/zmq_listener.hpp @@ -20,7 +20,10 @@ #ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ +#include + #include "io_object.hpp" +#include "tcp_listener.hpp" namespace zmq { @@ -31,6 +34,9 @@ namespace zmq zmq_listener_t (class io_thread_t *parent_, object_t *owner_); + // Set IP address to listen on. + int set_address (const char *addr_); + private: ~zmq_listener_t (); @@ -38,6 +44,15 @@ namespace zmq // Handlers for incoming commands. void process_plug (); + // Handle I/O events. + void in_event (); + + // Actual listening socket. + tcp_listener_t tcp_listener; + + // Handle corresponding to the listening socket. + handle_t handle; + zmq_listener_t (const zmq_listener_t&); void operator = (const zmq_listener_t&); }; -- cgit v1.2.3