summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-09 11:21:47 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-09 11:21:47 +0200
commitbde396f1561fb5e57e6e413a40d904586e186d42 (patch)
treecbd7537c95bbc8ab8a09a17cec6533a268500264
parent5b5b513330e96e3e08d0c2c60d03044091976420 (diff)
fix to 3-thread synchronisation algorithm
-rw-r--r--src/io_object.cpp48
-rw-r--r--src/io_object.hpp21
-rw-r--r--src/socket_base.cpp6
-rw-r--r--src/tcp_listener.cpp19
-rw-r--r--src/tcp_listener.hpp11
-rw-r--r--src/zmq_listener.cpp36
-rw-r--r--src/zmq_listener.hpp15
7 files changed, 142 insertions, 14 deletions
diff --git a/src/io_object.cpp b/src/io_object.cpp
index a4badd7..b7c70d4 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -19,10 +19,13 @@
#include "io_object.hpp"
#include "io_thread.hpp"
+#include "err.hpp"
zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) :
object_t (parent_),
- owner (owner_)
+ owner (owner_),
+ plugged_in (false),
+ terminated (false)
{
// Retrieve the poller from the thread we are running in.
poller = parent_->get_poller ();
@@ -32,6 +35,23 @@ zmq::io_object_t::~io_object_t ()
{
}
+void zmq::io_object_t::process_plug ()
+{
+ zmq_assert (!plugged_in);
+
+ // If termination of the object was already requested, destroy it and
+ // send the termination acknowledgement.
+ if (terminated) {
+ send_term_ack (owner);
+ delete this;
+ return;
+ }
+
+ // Notify the generic termination mechanism (io_object_t) that the object
+ // is already plugged in.
+ plugged_in = true;
+}
+
zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_, i_poll_events *events_)
{
return poller->add_fd (fd_, events_);
@@ -72,6 +92,21 @@ void zmq::io_object_t::cancel_timer (i_poll_events *events_)
poller->cancel_timer (events_);
}
+void zmq::io_object_t::in_event ()
+{
+ zmq_assert (false);
+}
+
+void zmq::io_object_t::out_event ()
+{
+ zmq_assert (false);
+}
+
+void zmq::io_object_t::timer_event ()
+{
+ zmq_assert (false);
+}
+
void zmq::io_object_t::term ()
{
send_term_req (owner, this);
@@ -79,6 +114,17 @@ void zmq::io_object_t::term ()
void zmq::io_object_t::process_term ()
{
+ zmq_assert (!terminated);
+
+ // If termination request has occured even before the object was plugged in
+ // wait till plugging in happens, then acknowledge the termination.
+ if (!plugged_in) {
+ terminated = true;
+ return;
+ }
+
+ // Otherwise, destroy the object and acknowledge the termination
+ // straight away.
send_term_ack (owner);
delete this;
}
diff --git a/src/io_object.hpp b/src/io_object.hpp
index ddb4414..2029bfb 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -22,11 +22,12 @@
#include "object.hpp"
#include "i_poller.hpp"
+#include "i_poll_events.hpp"
namespace zmq
{
- class io_object_t : public object_t
+ class io_object_t : public object_t, public i_poll_events
{
public:
@@ -45,6 +46,11 @@ namespace zmq
// of I/O object correctly.
virtual ~io_object_t ();
+ // Handlers for incoming commands. It vital that every I/O object
+ // invokes io_object_t::process_plug at the end of it's own plug
+ // handler.
+ void process_plug ();
+
// Methods to access underlying poller object.
handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_);
@@ -55,12 +61,25 @@ namespace zmq
void add_timer (struct i_poll_events *events_);
void cancel_timer (struct i_poll_events *events_);
+ // i_poll_events interface implementation.
+ void in_event ();
+ void out_event ();
+ void timer_event ();
+
// Socket owning this I/O object. It is responsible for destroying
// it when it's being closed.
object_t *owner;
+ // Set to true when object is plugged in. It's responsibility
+ // of derived object to set the property after the feat.
+ bool plugged_in;
+
private:
+ // Set to true when object was terminated before it was plugged in.
+ // In such case destruction is delayed till 'plug' command arrives.
+ bool terminated;
+
struct i_poller *poller;
// Handlers for incoming commands.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 026b317..3141517 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -70,7 +70,11 @@ int zmq::socket_base_t::bind (const char *addr_)
{
// TODO: The taskset should be taken from socket options.
uint64_t taskset = 0;
- object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this);
+ zmq_listener_t *listener = new zmq_listener_t (choose_io_thread (taskset), this);
+ int rc = listener->set_address (addr_);
+ if (rc != 0)
+ return -1;
+
send_plug (listener);
send_own (this, listener);
return 0;
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 6aae88a..937618d 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "tcp_listener.hpp"
#include "platform.hpp"
#include "ip.hpp"
@@ -41,6 +43,7 @@
zmq::tcp_listener_t::tcp_listener_t () :
s (retired_fd)
{
+ memset (&addr, 0, sizeof (addr));
}
zmq::tcp_listener_t::~tcp_listener_t ()
@@ -49,14 +52,14 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close ();
}
-int zmq::tcp_listener_t::open (const char *addr_)
+int zmq::tcp_listener_t::set_address (const char *addr_)
{
// Convert the interface into sockaddr_in structure.
- sockaddr_in ip_address;
- int rc = resolve_ip_interface (&ip_address, addr_);
- if (rc != 0)
- return -1;
-
+ return resolve_ip_interface (&addr, addr_);
+}
+
+int zmq::tcp_listener_t::open ()
+{
// Create a listening socket.
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (s == -1)
@@ -64,7 +67,7 @@ int zmq::tcp_listener_t::open (const char *addr_)
// Allow reusing of the address.
int flag = 1;
- rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
+ int rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
// Set the non-blocking flag.
@@ -75,7 +78,7 @@ int zmq::tcp_listener_t::open (const char *addr_)
errno_assert (rc != -1);
// Bind the socket to the network interface and port.
- rc = bind (s, (struct sockaddr*) &ip_address, sizeof (ip_address));
+ rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
if (rc != 0) {
close ();
return -1;
diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp
index 43a4aa8..2a8bc23 100644
--- a/src/tcp_listener.hpp
+++ b/src/tcp_listener.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_TCP_LISTENER_HPP_INCLUDED__
#include "fd.hpp"
+#include "ip.hpp"
namespace zmq
{
@@ -34,10 +35,13 @@ namespace zmq
tcp_listener_t ();
~tcp_listener_t ();
- // Open TCP listining socket. Address is in
+ // Set up the address to listen on. Address is in
// <interface-name>:<port-number> format. Interface name may be '*'
// to bind to all the interfaces.
- int open (const char *addr_);
+ int set_address (const char *addr_);
+
+ // Open TCP listining socket.
+ int open ();
// Close the listening socket.
int close ();
@@ -53,6 +57,9 @@ namespace zmq
private:
+ // IP address/port to listen on.
+ sockaddr_in addr;
+
// Underlying socket.
fd_t s;
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 9787f7e..9c9bbe9 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -29,12 +29,46 @@ zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) :
zmq::zmq_listener_t::~zmq_listener_t ()
{
+ if (plugged_in)
+ rm_fd (handle);
+}
+
+int zmq::zmq_listener_t::set_address (const char *addr_)
+{
+ return tcp_listener.set_address (addr_);
}
void zmq::zmq_listener_t::process_plug ()
{
- // TODO: Testing code follows...
+ // Open the listening socket.
+ int rc = tcp_listener.open ();
+ zmq_assert (rc == 0);
+
+ // Start polling for incoming connections.
+ handle = add_fd (tcp_listener.get_fd (), this);
+ set_pollin (handle);
+
+ io_object_t::process_plug ();
+}
+
+void zmq::zmq_listener_t::in_event ()
+{
+ fd_t fd = tcp_listener.accept ();
+
+ // If connection was reset by the peer in the meantime, just ignore it.
+ // TODO: Handle specific errors like ENFILE/EMFILE etc.
+ if (fd == retired_fd)
+ return;
+
+ // TODO
+ zmq_assert (false);
+
+/*
object_t *engine = new zmq_engine_t (choose_io_thread (0), owner);
send_plug (engine);
send_own (owner, engine);
+*/
}
+
+
+
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
index ea7cb92..74d42e1 100644
--- a/src/zmq_listener.hpp
+++ b/src/zmq_listener.hpp
@@ -20,7 +20,10 @@
#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
+#include <string>
+
#include "io_object.hpp"
+#include "tcp_listener.hpp"
namespace zmq
{
@@ -31,6 +34,9 @@ namespace zmq
zmq_listener_t (class io_thread_t *parent_, object_t *owner_);
+ // Set IP address to listen on.
+ int set_address (const char *addr_);
+
private:
~zmq_listener_t ();
@@ -38,6 +44,15 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
+ // Handle I/O events.
+ void in_event ();
+
+ // Actual listening socket.
+ tcp_listener_t tcp_listener;
+
+ // Handle corresponding to the listening socket.
+ handle_t handle;
+
zmq_listener_t (const zmq_listener_t&);
void operator = (const zmq_listener_t&);
};