summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-20 12:03:14 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-20 12:03:14 +0200
commit7668b246fc3cf4a2a3b3ee9b1283ad8a4b12ac4f (patch)
tree37ca9e3928cdd1e3e5c41f6cc694911909e4c08c /src
parent495a2228cd08a29946f9e9ce2e0721e789203e35 (diff)
ZMQ_POLL option forces fd_signaler to be used in app_thread
Diffstat (limited to 'src')
-rw-r--r--src/app_thread.cpp18
-rw-r--r--src/app_thread.hpp5
-rw-r--r--src/fd_signaler.cpp39
-rw-r--r--src/fd_signaler.hpp3
-rw-r--r--src/i_signaler.hpp4
-rw-r--r--src/ypollset.cpp4
-rw-r--r--src/ypollset.hpp2
7 files changed, 48 insertions, 27 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 8c83313..303c6a1 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -31,6 +31,8 @@
#include "app_thread.hpp"
#include "dispatcher.hpp"
+#include "fd_signaler.hpp"
+#include "ypollset.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
@@ -52,16 +54,26 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
associated (false),
last_processing_time (0)
{
+ if (flags_ & ZMQ_POLL) {
+ signaler = new fd_signaler_t;
+ zmq_assert (signaler);
+ }
+ else {
+ signaler = new ypollset_t;
+ zmq_assert (signaler);
+ }
}
zmq::app_thread_t::~app_thread_t ()
{
zmq_assert (sockets.empty ());
+ zmq_assert (signaler);
+ delete signaler;
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
{
- return &pollset;
+ return signaler;
}
bool zmq::app_thread_t::is_current ()
@@ -86,7 +98,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
uint64_t signals;
if (block_)
- signals = pollset.poll ();
+ signals = signaler->poll ();
else {
#if defined ZMQ_DELAY_COMMANDS
@@ -119,7 +131,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
#endif
// Check whether there are any commands pending for this thread.
- signals = pollset.check ();
+ signals = signaler->check ();
}
if (signals) {
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 5fdb92d..4fe67fb 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -24,7 +24,6 @@
#include "stdint.hpp"
#include "object.hpp"
-#include "ypollset.hpp"
#include "thread.hpp"
namespace zmq
@@ -40,7 +39,7 @@ namespace zmq
~app_thread_t ();
// Returns signaler associated with this application thread.
- i_signaler *get_signaler ();
+ struct i_signaler *get_signaler ();
// Nota bene: Following two functions are accessed from different
// threads. The caller (dispatcher) is responsible for synchronisation
@@ -79,7 +78,7 @@ namespace zmq
thread_t::id_t tid;
// App thread's signaler object.
- ypollset_t pollset;
+ struct i_signaler *signaler;
// Timestamp of when commands were processed the last time.
uint64_t last_processing_time;
diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp
index 52c3129..8c71356 100644
--- a/src/fd_signaler.cpp
+++ b/src/fd_signaler.cpp
@@ -65,12 +65,18 @@ void zmq::fd_signaler_t::signal (int signal_)
errno_assert (sz == sizeof (uint64_t));
}
+uint64_t zmq::fd_signaler_t::poll ()
+{
+ // TODO: Can we do a blocking read on non-blocking eventfd?
+ // It's not needed as for now, so let it stay unimplemented.
+ zmq_assert (false);
+}
+
uint64_t zmq::fd_signaler_t::check ()
{
uint64_t val;
ssize_t sz = read (fd, &val, sizeof (uint64_t));
- if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
- errno == EINTR))
+ if (sz == -1 && (errno == EAGAIN || errno == EINTR))
return 0;
errno_assert (sz != -1);
return val;
@@ -120,7 +126,7 @@ zmq::fd_signaler_t::fd_signaler_t ()
rc = connect (w, (sockaddr *) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
- // Accept connection from w
+ // Accept connection from w.
r = accept (listener, NULL, NULL);
wsa_assert (r != INVALID_SOCKET);
@@ -139,6 +145,9 @@ zmq::fd_signaler_t::~fd_signaler_t ()
void zmq::fd_signaler_t::signal (int signal_)
{
+ // TODO: Note that send is a blocking operation.
+ // How should we behave if the signal cannot be written to the signaler?
+
zmq_assert (signal_ >= 0 && signal_ < 64);
char c = (char) signal_;
int rc = send (w, &c, 1, 0);
@@ -154,7 +163,7 @@ uint64_t zmq::fd_signaler_t::poll ()
// If there are no signals, wait until at least one signal arrives.
unsigned char sig;
- int nbytes = recv (r, (char*) &sig, 1, MSG_WAITALL);
+ int nbytes = recv (r, (char*) &sig, 1, 0);
win_assert (nbytes != -1);
return uint64_t (1) << sig;
}
@@ -162,7 +171,9 @@ uint64_t zmq::fd_signaler_t::poll ()
uint64_t zmq::fd_signaler_t::check ()
{
unsigned char buffer [32];
- int nbytes = recv (r, (char*) buffer, 32, 0);
+ int nbytes = recv (r, (char*) buffer, 32, MSG_DONTWAIT);
+ if (nbytes == -1 && errno == EAGAIN)
+ return 0;
win_assert (nbytes != -1);
uint64_t signals = 0;
@@ -190,13 +201,6 @@ zmq::fd_signaler_t::fd_signaler_t ()
errno_assert (rc == 0);
w = sv [0];
r = sv [1];
-
- // Set to non-blocking mode.
- int flags = fcntl (r, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
- rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc != -1);
}
zmq::fd_signaler_t::~fd_signaler_t ()
@@ -207,6 +211,9 @@ zmq::fd_signaler_t::~fd_signaler_t ()
void zmq::fd_signaler_t::signal (int signal_)
{
+ // TODO: Note that send is a blocking operation.
+ // How should we behave if the signal cannot be written to the signaler?
+
zmq_assert (signal_ >= 0 && signal_ < 64);
unsigned char c = (unsigned char) signal_;
ssize_t nbytes = send (w, &c, 1, 0);
@@ -222,7 +229,7 @@ uint64_t zmq::fd_signaler_t::poll ()
// If there are no signals, wait until at least one signal arrives.
unsigned char sig;
- ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL);
+ ssize_t nbytes = recv (r, &sig, 1, 0);
errno_assert (nbytes != -1);
return uint64_t (1) << sig;
}
@@ -230,8 +237,10 @@ uint64_t zmq::fd_signaler_t::poll ()
uint64_t zmq::fd_signaler_t::check ()
{
unsigned char buffer [32];
- ssize_t nbytes = recv (r, buffer, 32, 0);
- errno_assert (nbytes != -1);
+ ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT);
+ if (nbytes == -1 && errno == EAGAIN)
+ return 0;
+ zmq_assert (nbytes != -1);
uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos ++) {
diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp
index 107645e..e1b56ad 100644
--- a/src/fd_signaler.hpp
+++ b/src/fd_signaler.hpp
@@ -37,10 +37,7 @@ namespace zmq
{
public:
- // Initialise the object.
fd_signaler_t ();
-
- // Destroy the object.
~fd_signaler_t ();
// i_signaler interface implementation.
diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp
index 729b5b7..a09fe7e 100644
--- a/src/i_signaler.hpp
+++ b/src/i_signaler.hpp
@@ -37,11 +37,11 @@ namespace zmq
// Wait for signal. Returns a set of signals in form of a bitmap.
// Signal with index 0 corresponds to value 1, index 1 to value 2,
// index 2 to value 3 etc.
- uint64_t poll ();
+ virtual uint64_t poll () = 0;
// Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal.
- uint64_t check ();
+ virtual uint64_t check () = 0;
};
}
diff --git a/src/ypollset.cpp b/src/ypollset.cpp
index 5a7c69f..a72eac7 100644
--- a/src/ypollset.cpp
+++ b/src/ypollset.cpp
@@ -23,6 +23,10 @@ zmq::ypollset_t::ypollset_t ()
{
}
+zmq::ypollset_t::~ypollset_t ()
+{
+}
+
void zmq::ypollset_t::signal (int signal_)
{
zmq_assert (signal_ >= 0 && signal_ < wait_signal);
diff --git a/src/ypollset.hpp b/src/ypollset.hpp
index 296201a..25eb3e0 100644
--- a/src/ypollset.hpp
+++ b/src/ypollset.hpp
@@ -35,8 +35,8 @@ namespace zmq
{
public:
- // Create the pollset.
ypollset_t ();
+ ~ypollset_t ();
// i_signaler interface implementation.
void signal (int signal_);