summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/devpoll.cpp6
-rw-r--r--src/epoll.cpp6
-rw-r--r--src/i_poll_events.hpp8
-rw-r--r--src/io_object.cpp4
-rw-r--r--src/io_object.hpp4
-rw-r--r--src/io_thread.cpp4
-rw-r--r--src/io_thread.hpp4
-rw-r--r--src/ipc_connecter.cpp8
-rw-r--r--src/ipc_connecter.hpp4
-rw-r--r--src/ipc_listener.cpp2
-rw-r--r--src/ipc_listener.hpp2
-rw-r--r--src/kqueue.cpp6
-rw-r--r--src/pgm_receiver.cpp6
-rw-r--r--src/pgm_receiver.hpp2
-rw-r--r--src/pgm_sender.cpp10
-rw-r--r--src/pgm_sender.hpp4
-rw-r--r--src/poll.cpp6
-rw-r--r--src/reaper.cpp4
-rw-r--r--src/reaper.hpp4
-rw-r--r--src/select.cpp6
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/socket_base.hpp4
-rw-r--r--src/stream_engine.cpp10
-rw-r--r--src/stream_engine.hpp4
-rw-r--r--src/tcp_connecter.cpp8
-rw-r--r--src/tcp_connecter.hpp4
-rw-r--r--src/tcp_listener.cpp2
-rw-r--r--src/tcp_listener.hpp2
28 files changed, 70 insertions, 68 deletions
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index 12b22ce..da56a7f 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -169,15 +169,15 @@ void xs::devpoll_t::loop ()
if (!fd_ptr->valid || !fd_ptr->accepted)
continue;
if (ev_buf [i].revents & (POLLERR | POLLHUP))
- fd_ptr->reactor->in_event ();
+ fd_ptr->reactor->in_event (ev_buf [i].fd);
if (!fd_ptr->valid || !fd_ptr->accepted)
continue;
if (ev_buf [i].revents & POLLOUT)
- fd_ptr->reactor->out_event ();
+ fd_ptr->reactor->out_event (ev_buf [i].fd);
if (!fd_ptr->valid || !fd_ptr->accepted)
continue;
if (ev_buf [i].revents & POLLIN)
- fd_ptr->reactor->in_event ();
+ fd_ptr->reactor->in_event (ev_buf [i].fd);
}
}
}
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 3902282..354ccc4 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -150,15 +150,15 @@ void xs::epoll_t::loop ()
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
- pe->events->in_event ();
+ pe->events->in_event (pe->fd);
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].events & EPOLLOUT)
- pe->events->out_event ();
+ pe->events->out_event (pe->fd);
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].events & EPOLLIN)
- pe->events->in_event ();
+ pe->events->in_event (pe->fd);
}
// Destroy retired event sources.
diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp
index ca1625d..0b9bd86 100644
--- a/src/i_poll_events.hpp
+++ b/src/i_poll_events.hpp
@@ -21,7 +21,9 @@
#ifndef __XS_I_POLL_EVENTS_HPP_INCLUDED__
#define __XS_I_POLL_EVENTS_HPP_INCLUDED__
-
+
+#include "fd.hpp"
+
namespace xs
{
@@ -33,10 +35,10 @@ namespace xs
virtual ~i_poll_events () {}
// Called by I/O thread when file descriptor is ready for reading.
- virtual void in_event () = 0;
+ virtual void in_event (fd_t fd_) = 0;
// Called by I/O thread when file descriptor is ready for writing.
- virtual void out_event () = 0;
+ virtual void out_event (fd_t fd_) = 0;
// Called when timer expires.
virtual void timer_event (int id_) = 0;
diff --git a/src/io_object.cpp b/src/io_object.cpp
index 1b0231d..0424c63 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -92,12 +92,12 @@ void xs::io_object_t::cancel_timer (int id_)
poller->cancel_timer (this, id_);
}
-void xs::io_object_t::in_event ()
+void xs::io_object_t::in_event (fd_t fd_)
{
xs_assert (false);
}
-void xs::io_object_t::out_event ()
+void xs::io_object_t::out_event (fd_t fd_)
{
xs_assert (false);
}
diff --git a/src/io_object.hpp b/src/io_object.hpp
index b3496b6..ef65f75 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -64,8 +64,8 @@ namespace xs
void cancel_timer (int id_);
// i_poll_events interface implementation.
- void in_event ();
- void out_event ();
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
void timer_event (int id_);
private:
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index e5abbe0..75bca4f 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -62,7 +62,7 @@ int xs::io_thread_t::get_load ()
return poller->get_load ();
}
-void xs::io_thread_t::in_event ()
+void xs::io_thread_t::in_event (fd_t fd_)
{
// TODO: Do we want to limit number of commands I/O thread can
// process in a single go?
@@ -83,7 +83,7 @@ void xs::io_thread_t::in_event ()
}
}
-void xs::io_thread_t::out_event ()
+void xs::io_thread_t::out_event (fd_t fd_)
{
// We are never polling for POLLOUT here. This function is never called.
xs_assert (false);
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index 4884c5b..4a53c15 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -58,8 +58,8 @@ namespace xs
mailbox_t *get_mailbox ();
// i_poll_events implementation.
- void in_event ();
- void out_event ();
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
void timer_event (int id_);
// Used by io_objects to retrieve the assciated poller object.
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index 8e3ea9c..243d795 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -73,15 +73,15 @@ void xs::ipc_connecter_t::process_plug ()
start_connecting ();
}
-void xs::ipc_connecter_t::in_event ()
+void xs::ipc_connecter_t::in_event (fd_t fd_)
{
// We are not polling for incomming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
- out_event ();
+ out_event (fd_);
}
-void xs::ipc_connecter_t::out_event ()
+void xs::ipc_connecter_t::out_event (fd_t fd_)
{
fd_t fd = connect ();
rm_fd (handle);
@@ -122,7 +122,7 @@ void xs::ipc_connecter_t::start_connecting ()
if (rc == 0) {
handle = add_fd (s);
handle_valid = true;
- out_event ();
+ out_event (s);
return;
}
diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp
index b2ee20d..81edda2 100644
--- a/src/ipc_connecter.hpp
+++ b/src/ipc_connecter.hpp
@@ -57,8 +57,8 @@ namespace xs
void process_plug ();
// Handlers for I/O events.
- void in_event ();
- void out_event ();
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
void timer_event (int id_);
// Internal function to start the actual connection establishment.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index accd09c..db49528 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -68,7 +68,7 @@ void xs::ipc_listener_t::process_term (int linger_)
own_t::process_term (linger_);
}
-void xs::ipc_listener_t::in_event ()
+void xs::ipc_listener_t::in_event (fd_t fd_)
{
fd_t fd = accept ();
diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp
index d2e19df..b599bff 100644
--- a/src/ipc_listener.hpp
+++ b/src/ipc_listener.hpp
@@ -56,7 +56,7 @@ namespace xs
void process_term (int linger_);
// Handlers for I/O events.
- void in_event ();
+ void in_event (fd_t fd_);
// Close the listening socket.
int close ();
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index 046cbc7..62b443f 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -173,15 +173,15 @@ void xs::kqueue_t::loop ()
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].flags & EV_EOF)
- pe->reactor->in_event ();
+ pe->reactor->in_event (pe->fd);
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].filter == EVFILT_WRITE)
- pe->reactor->out_event ();
+ pe->reactor->out_event (pe->fd);
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].filter == EVFILT_READ)
- pe->reactor->in_event ();
+ pe->reactor->in_event (pe->fd);
}
// Destroy retired event sources.
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index ddf46b9..19866c7 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -145,10 +145,10 @@ void xs::pgm_receiver_t::activate_in ()
set_pollin (pipe_handle);
set_pollin (socket_handle);
- in_event ();
+ in_event (retired_fd);
}
-void xs::pgm_receiver_t::in_event ()
+void xs::pgm_receiver_t::in_event (fd_t fd_)
{
// Read data from the underlying pgm_socket.
unsigned char *data = NULL;
@@ -269,7 +269,7 @@ void xs::pgm_receiver_t::timer_event (int token)
// Timer cancels on return by poller_base.
has_rx_timer = false;
- in_event ();
+ in_event (retired_fd);
}
void xs::pgm_receiver_t::drop_subscriptions ()
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 939f540..cd767f8 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -65,7 +65,7 @@ namespace xs
void activate_out ();
// i_poll_events interface implementation.
- void in_event ();
+ void in_event (fd_t fd_);
void timer_event (int token);
private:
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 1bfd944..54997cd 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -132,7 +132,7 @@ void xs::pgm_sender_t::terminate ()
void xs::pgm_sender_t::activate_out ()
{
set_pollout (handle);
- out_event ();
+ out_event (retired_fd);
}
void xs::pgm_sender_t::activate_in ()
@@ -148,7 +148,7 @@ xs::pgm_sender_t::~pgm_sender_t ()
}
}
-void xs::pgm_sender_t::in_event ()
+void xs::pgm_sender_t::in_event (fd_t fd_)
{
if (has_rx_timer) {
cancel_timer (rx_timer_id);
@@ -164,7 +164,7 @@ void xs::pgm_sender_t::in_event ()
}
}
-void xs::pgm_sender_t::out_event ()
+void xs::pgm_sender_t::out_event (fd_t fd_)
{
// POLLOUT event from send socket. If write buffer is empty,
// try to read new data from the encoder.
@@ -217,10 +217,10 @@ void xs::pgm_sender_t::timer_event (int token)
// Timer cancels on return by poller_base.
if (token == rx_timer_id) {
has_rx_timer = false;
- in_event ();
+ in_event (retired_fd);
} else if (token == tx_timer_id) {
has_tx_timer = false;
- out_event ();
+ out_event (retired_fd);
} else
xs_assert (false);
}
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 277fb53..23c241b 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -63,8 +63,8 @@ namespace xs
void activate_out ();
// i_poll_events interface implementation.
- void in_event ();
- void out_event ();
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
void timer_event (int token);
private:
diff --git a/src/poll.cpp b/src/poll.cpp
index 86fdd5b..48a204e 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -141,15 +141,15 @@ void xs::poll_t::loop ()
if (pollset [i].fd == retired_fd)
continue;
if (pollset [i].revents & (POLLERR | POLLHUP))
- fd_table [pollset [i].fd].events->in_event ();
+ fd_table [pollset [i].fd].events->in_event (pollset [i].fd);
if (pollset [i].fd == retired_fd)
continue;
if (pollset [i].revents & POLLOUT)
- fd_table [pollset [i].fd].events->out_event ();
+ fd_table [pollset [i].fd].events->out_event (pollset [i].fd);
if (pollset [i].fd == retired_fd)
continue;
if (pollset [i].revents & POLLIN)
- fd_table [pollset [i].fd].events->in_event ();
+ fd_table [pollset [i].fd].events->in_event (pollset [i].fd);
}
// Clean up the pollset and update the fd_table accordingly.
diff --git a/src/reaper.cpp b/src/reaper.cpp
index 1eb2b7c..7e38b73 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -55,7 +55,7 @@ void xs::reaper_t::stop ()
send_stop ();
}
-void xs::reaper_t::in_event ()
+void xs::reaper_t::in_event (fd_t fd_)
{
while (true) {
@@ -73,7 +73,7 @@ void xs::reaper_t::in_event ()
}
}
-void xs::reaper_t::out_event ()
+void xs::reaper_t::out_event (fd_t fd_)
{
xs_assert (false);
}
diff --git a/src/reaper.hpp b/src/reaper.hpp
index b3fc818..0cb31a2 100644
--- a/src/reaper.hpp
+++ b/src/reaper.hpp
@@ -45,8 +45,8 @@ namespace xs
void stop ();
// i_poll_events implementation.
- void in_event ();
- void out_event ();
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
void timer_event (int id_);
private:
diff --git a/src/select.cpp b/src/select.cpp
index b2ade09..634e358 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -182,15 +182,15 @@ void xs::select_t::loop ()
if (fds [i].fd == retired_fd)
continue;
if (FD_ISSET (fds [i].fd, &exceptfds))
- fds [i].events->in_event ();
+ fds [i].events->in_event (fds [i].fd);
if (fds [i].fd == retired_fd)
continue;
if (FD_ISSET (fds [i].fd, &writefds))
- fds [i].events->out_event ();
+ fds [i].events->out_event (fds [i].fd);
if (fds [i].fd == retired_fd)
continue;
if (FD_ISSET (fds [i].fd, &readfds))
- fds [i].events->in_event ();
+ fds [i].events->in_event (fds [i].fd);
}
// Destroy retired event sources.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 79472d1..ea6d3ed 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -792,7 +792,7 @@ void xs::socket_base_t::xhiccuped (pipe_t *pipe_)
xs_assert (false);
}
-void xs::socket_base_t::in_event ()
+void xs::socket_base_t::in_event (fd_t fd_)
{
// This function is invoked only once the socket is running in the context
// of the reaper thread. Process any commands from other threads/sockets
@@ -802,7 +802,7 @@ void xs::socket_base_t::in_event ()
check_destroy ();
}
-void xs::socket_base_t::out_event ()
+void xs::socket_base_t::out_event (fd_t fd_)
{
xs_assert (false);
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index bc405f6..8b2b216 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -86,8 +86,8 @@ namespace xs
// i_poll_events implementation. This interface is used when socket
// is handled by the poller in the reaper thread.
- void in_event ();
- void out_event ();
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
void timer_event (int id_);
// i_pipe_events interface implementation.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 0ae96d2..fb49152 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -124,7 +124,7 @@ void xs::stream_engine_t::plug (io_thread_t *io_thread_,
set_pollout (handle);
// Flush all the data that may have been already received downstream.
- in_event ();
+ in_event (s);
}
void xs::stream_engine_t::unplug ()
@@ -151,7 +151,7 @@ void xs::stream_engine_t::terminate ()
delete this;
}
-void xs::stream_engine_t::in_event ()
+void xs::stream_engine_t::in_event (fd_t fd_)
{
bool disconnection = false;
@@ -206,7 +206,7 @@ void xs::stream_engine_t::in_event ()
error ();
}
-void xs::stream_engine_t::out_event ()
+void xs::stream_engine_t::out_event (fd_t fd_)
{
bool more_data = true;
@@ -260,7 +260,7 @@ void xs::stream_engine_t::activate_out ()
// was sent by the user the socket is probably available for writing.
// Thus we try to write the data to socket avoiding polling for POLLOUT.
// Consequently, the latency should be better in request/reply scenarios.
- out_event ();
+ out_event (s);
}
void xs::stream_engine_t::activate_in ()
@@ -268,7 +268,7 @@ void xs::stream_engine_t::activate_in ()
set_pollin (handle);
// Speculative read.
- in_event ();
+ in_event (s);
}
void xs::stream_engine_t::error ()
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 472c07a..f246552 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -56,8 +56,8 @@ namespace xs
void activate_out ();
// i_poll_events interface implementation.
- void in_event ();
- void out_event ();
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
private:
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 36ddfa7..f5a9180 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -82,15 +82,15 @@ void xs::tcp_connecter_t::process_plug ()
start_connecting ();
}
-void xs::tcp_connecter_t::in_event ()
+void xs::tcp_connecter_t::in_event (fd_t fd_)
{
// We are not polling for incomming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
- out_event ();
+ out_event (s);
}
-void xs::tcp_connecter_t::out_event ()
+void xs::tcp_connecter_t::out_event (fd_t fd_)
{
fd_t fd = connect ();
rm_fd (handle);
@@ -133,7 +133,7 @@ void xs::tcp_connecter_t::start_connecting ()
if (rc == 0) {
handle = add_fd (s);
handle_valid = true;
- out_event ();
+ out_event (s);
return;
}
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index a174473..5791422 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -54,8 +54,8 @@ namespace xs
void process_plug ();
// Handlers for I/O events.
- void in_event ();
- void out_event ();
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
void timer_event (int id_);
// Internal function to start the actual connection establishment.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 2a2819c..31a9149 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -77,7 +77,7 @@ void xs::tcp_listener_t::process_term (int linger_)
own_t::process_term (linger_);
}
-void xs::tcp_listener_t::in_event ()
+void xs::tcp_listener_t::in_event (fd_t fd_)
{
fd_t fd = accept ();
diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp
index 6c760e2..153f771 100644
--- a/src/tcp_listener.hpp
+++ b/src/tcp_listener.hpp
@@ -52,7 +52,7 @@ namespace xs
void process_term (int linger_);
// Handlers for I/O events.
- void in_event ();
+ void in_event (fd_t fd_);
// Close the listening socket.
void close ();