From 0e167c86051614f9d9d9fab255aae8272b1b056b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:03:45 +0900 Subject: File descriptor passed to in_event and out_event Signed-off-by: Martin Sustrik --- src/devpoll.cpp | 6 +++--- src/epoll.cpp | 6 +++--- src/i_poll_events.hpp | 8 +++++--- src/io_object.cpp | 4 ++-- src/io_object.hpp | 4 ++-- src/io_thread.cpp | 4 ++-- src/io_thread.hpp | 4 ++-- src/ipc_connecter.cpp | 8 ++++---- src/ipc_connecter.hpp | 4 ++-- src/ipc_listener.cpp | 2 +- src/ipc_listener.hpp | 2 +- src/kqueue.cpp | 6 +++--- src/pgm_receiver.cpp | 6 +++--- src/pgm_receiver.hpp | 2 +- src/pgm_sender.cpp | 10 +++++----- src/pgm_sender.hpp | 4 ++-- src/poll.cpp | 6 +++--- src/reaper.cpp | 4 ++-- src/reaper.hpp | 4 ++-- src/select.cpp | 6 +++--- src/socket_base.cpp | 4 ++-- src/socket_base.hpp | 4 ++-- src/stream_engine.cpp | 10 +++++----- src/stream_engine.hpp | 4 ++-- src/tcp_connecter.cpp | 8 ++++---- src/tcp_connecter.hpp | 4 ++-- src/tcp_listener.cpp | 2 +- src/tcp_listener.hpp | 2 +- 28 files changed, 70 insertions(+), 68 deletions(-) diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 12b22ce..da56a7f 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -169,15 +169,15 @@ void xs::devpoll_t::loop () if (!fd_ptr->valid || !fd_ptr->accepted) continue; if (ev_buf [i].revents & (POLLERR | POLLHUP)) - fd_ptr->reactor->in_event (); + fd_ptr->reactor->in_event (ev_buf [i].fd); if (!fd_ptr->valid || !fd_ptr->accepted) continue; if (ev_buf [i].revents & POLLOUT) - fd_ptr->reactor->out_event (); + fd_ptr->reactor->out_event (ev_buf [i].fd); if (!fd_ptr->valid || !fd_ptr->accepted) continue; if (ev_buf [i].revents & POLLIN) - fd_ptr->reactor->in_event (); + fd_ptr->reactor->in_event (ev_buf [i].fd); } } } diff --git a/src/epoll.cpp b/src/epoll.cpp index 3902282..354ccc4 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -150,15 +150,15 @@ void xs::epoll_t::loop () if (pe->fd == retired_fd) continue; if (ev_buf [i].events & (EPOLLERR | EPOLLHUP)) - pe->events->in_event (); + pe->events->in_event (pe->fd); if (pe->fd == retired_fd) continue; if (ev_buf [i].events & EPOLLOUT) - pe->events->out_event (); + pe->events->out_event (pe->fd); if (pe->fd == retired_fd) continue; if (ev_buf [i].events & EPOLLIN) - pe->events->in_event (); + pe->events->in_event (pe->fd); } // Destroy retired event sources. diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index ca1625d..0b9bd86 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -21,7 +21,9 @@ #ifndef __XS_I_POLL_EVENTS_HPP_INCLUDED__ #define __XS_I_POLL_EVENTS_HPP_INCLUDED__ - + +#include "fd.hpp" + namespace xs { @@ -33,10 +35,10 @@ namespace xs virtual ~i_poll_events () {} // Called by I/O thread when file descriptor is ready for reading. - virtual void in_event () = 0; + virtual void in_event (fd_t fd_) = 0; // Called by I/O thread when file descriptor is ready for writing. - virtual void out_event () = 0; + virtual void out_event (fd_t fd_) = 0; // Called when timer expires. virtual void timer_event (int id_) = 0; diff --git a/src/io_object.cpp b/src/io_object.cpp index 1b0231d..0424c63 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -92,12 +92,12 @@ void xs::io_object_t::cancel_timer (int id_) poller->cancel_timer (this, id_); } -void xs::io_object_t::in_event () +void xs::io_object_t::in_event (fd_t fd_) { xs_assert (false); } -void xs::io_object_t::out_event () +void xs::io_object_t::out_event (fd_t fd_) { xs_assert (false); } diff --git a/src/io_object.hpp b/src/io_object.hpp index b3496b6..ef65f75 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -64,8 +64,8 @@ namespace xs void cancel_timer (int id_); // i_poll_events interface implementation. - void in_event (); - void out_event (); + void in_event (fd_t fd_); + void out_event (fd_t fd_); void timer_event (int id_); private: diff --git a/src/io_thread.cpp b/src/io_thread.cpp index e5abbe0..75bca4f 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -62,7 +62,7 @@ int xs::io_thread_t::get_load () return poller->get_load (); } -void xs::io_thread_t::in_event () +void xs::io_thread_t::in_event (fd_t fd_) { // TODO: Do we want to limit number of commands I/O thread can // process in a single go? @@ -83,7 +83,7 @@ void xs::io_thread_t::in_event () } } -void xs::io_thread_t::out_event () +void xs::io_thread_t::out_event (fd_t fd_) { // We are never polling for POLLOUT here. This function is never called. xs_assert (false); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 4884c5b..4a53c15 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -58,8 +58,8 @@ namespace xs mailbox_t *get_mailbox (); // i_poll_events implementation. - void in_event (); - void out_event (); + void in_event (fd_t fd_); + void out_event (fd_t fd_); void timer_event (int id_); // Used by io_objects to retrieve the assciated poller object. diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 8e3ea9c..243d795 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -73,15 +73,15 @@ void xs::ipc_connecter_t::process_plug () start_connecting (); } -void xs::ipc_connecter_t::in_event () +void xs::ipc_connecter_t::in_event (fd_t fd_) { // We are not polling for incomming data, so we are actually called // because of error here. However, we can get error on out event as well // on some platforms, so we'll simply handle both events in the same way. - out_event (); + out_event (fd_); } -void xs::ipc_connecter_t::out_event () +void xs::ipc_connecter_t::out_event (fd_t fd_) { fd_t fd = connect (); rm_fd (handle); @@ -122,7 +122,7 @@ void xs::ipc_connecter_t::start_connecting () if (rc == 0) { handle = add_fd (s); handle_valid = true; - out_event (); + out_event (s); return; } diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index b2ee20d..81edda2 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -57,8 +57,8 @@ namespace xs void process_plug (); // Handlers for I/O events. - void in_event (); - void out_event (); + void in_event (fd_t fd_); + void out_event (fd_t fd_); void timer_event (int id_); // Internal function to start the actual connection establishment. diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index accd09c..db49528 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -68,7 +68,7 @@ void xs::ipc_listener_t::process_term (int linger_) own_t::process_term (linger_); } -void xs::ipc_listener_t::in_event () +void xs::ipc_listener_t::in_event (fd_t fd_) { fd_t fd = accept (); diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index d2e19df..b599bff 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -56,7 +56,7 @@ namespace xs void process_term (int linger_); // Handlers for I/O events. - void in_event (); + void in_event (fd_t fd_); // Close the listening socket. int close (); diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 046cbc7..62b443f 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -173,15 +173,15 @@ void xs::kqueue_t::loop () if (pe->fd == retired_fd) continue; if (ev_buf [i].flags & EV_EOF) - pe->reactor->in_event (); + pe->reactor->in_event (pe->fd); if (pe->fd == retired_fd) continue; if (ev_buf [i].filter == EVFILT_WRITE) - pe->reactor->out_event (); + pe->reactor->out_event (pe->fd); if (pe->fd == retired_fd) continue; if (ev_buf [i].filter == EVFILT_READ) - pe->reactor->in_event (); + pe->reactor->in_event (pe->fd); } // Destroy retired event sources. diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index ddf46b9..19866c7 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -145,10 +145,10 @@ void xs::pgm_receiver_t::activate_in () set_pollin (pipe_handle); set_pollin (socket_handle); - in_event (); + in_event (retired_fd); } -void xs::pgm_receiver_t::in_event () +void xs::pgm_receiver_t::in_event (fd_t fd_) { // Read data from the underlying pgm_socket. unsigned char *data = NULL; @@ -269,7 +269,7 @@ void xs::pgm_receiver_t::timer_event (int token) // Timer cancels on return by poller_base. has_rx_timer = false; - in_event (); + in_event (retired_fd); } void xs::pgm_receiver_t::drop_subscriptions () diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 939f540..cd767f8 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -65,7 +65,7 @@ namespace xs void activate_out (); // i_poll_events interface implementation. - void in_event (); + void in_event (fd_t fd_); void timer_event (int token); private: diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 1bfd944..54997cd 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -132,7 +132,7 @@ void xs::pgm_sender_t::terminate () void xs::pgm_sender_t::activate_out () { set_pollout (handle); - out_event (); + out_event (retired_fd); } void xs::pgm_sender_t::activate_in () @@ -148,7 +148,7 @@ xs::pgm_sender_t::~pgm_sender_t () } } -void xs::pgm_sender_t::in_event () +void xs::pgm_sender_t::in_event (fd_t fd_) { if (has_rx_timer) { cancel_timer (rx_timer_id); @@ -164,7 +164,7 @@ void xs::pgm_sender_t::in_event () } } -void xs::pgm_sender_t::out_event () +void xs::pgm_sender_t::out_event (fd_t fd_) { // POLLOUT event from send socket. If write buffer is empty, // try to read new data from the encoder. @@ -217,10 +217,10 @@ void xs::pgm_sender_t::timer_event (int token) // Timer cancels on return by poller_base. if (token == rx_timer_id) { has_rx_timer = false; - in_event (); + in_event (retired_fd); } else if (token == tx_timer_id) { has_tx_timer = false; - out_event (); + out_event (retired_fd); } else xs_assert (false); } diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 277fb53..23c241b 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -63,8 +63,8 @@ namespace xs void activate_out (); // i_poll_events interface implementation. - void in_event (); - void out_event (); + void in_event (fd_t fd_); + void out_event (fd_t fd_); void timer_event (int token); private: diff --git a/src/poll.cpp b/src/poll.cpp index 86fdd5b..48a204e 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -141,15 +141,15 @@ void xs::poll_t::loop () if (pollset [i].fd == retired_fd) continue; if (pollset [i].revents & (POLLERR | POLLHUP)) - fd_table [pollset [i].fd].events->in_event (); + fd_table [pollset [i].fd].events->in_event (pollset [i].fd); if (pollset [i].fd == retired_fd) continue; if (pollset [i].revents & POLLOUT) - fd_table [pollset [i].fd].events->out_event (); + fd_table [pollset [i].fd].events->out_event (pollset [i].fd); if (pollset [i].fd == retired_fd) continue; if (pollset [i].revents & POLLIN) - fd_table [pollset [i].fd].events->in_event (); + fd_table [pollset [i].fd].events->in_event (pollset [i].fd); } // Clean up the pollset and update the fd_table accordingly. diff --git a/src/reaper.cpp b/src/reaper.cpp index 1eb2b7c..7e38b73 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -55,7 +55,7 @@ void xs::reaper_t::stop () send_stop (); } -void xs::reaper_t::in_event () +void xs::reaper_t::in_event (fd_t fd_) { while (true) { @@ -73,7 +73,7 @@ void xs::reaper_t::in_event () } } -void xs::reaper_t::out_event () +void xs::reaper_t::out_event (fd_t fd_) { xs_assert (false); } diff --git a/src/reaper.hpp b/src/reaper.hpp index b3fc818..0cb31a2 100644 --- a/src/reaper.hpp +++ b/src/reaper.hpp @@ -45,8 +45,8 @@ namespace xs void stop (); // i_poll_events implementation. - void in_event (); - void out_event (); + void in_event (fd_t fd_); + void out_event (fd_t fd_); void timer_event (int id_); private: diff --git a/src/select.cpp b/src/select.cpp index b2ade09..634e358 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -182,15 +182,15 @@ void xs::select_t::loop () if (fds [i].fd == retired_fd) continue; if (FD_ISSET (fds [i].fd, &exceptfds)) - fds [i].events->in_event (); + fds [i].events->in_event (fds [i].fd); if (fds [i].fd == retired_fd) continue; if (FD_ISSET (fds [i].fd, &writefds)) - fds [i].events->out_event (); + fds [i].events->out_event (fds [i].fd); if (fds [i].fd == retired_fd) continue; if (FD_ISSET (fds [i].fd, &readfds)) - fds [i].events->in_event (); + fds [i].events->in_event (fds [i].fd); } // Destroy retired event sources. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 79472d1..ea6d3ed 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -792,7 +792,7 @@ void xs::socket_base_t::xhiccuped (pipe_t *pipe_) xs_assert (false); } -void xs::socket_base_t::in_event () +void xs::socket_base_t::in_event (fd_t fd_) { // This function is invoked only once the socket is running in the context // of the reaper thread. Process any commands from other threads/sockets @@ -802,7 +802,7 @@ void xs::socket_base_t::in_event () check_destroy (); } -void xs::socket_base_t::out_event () +void xs::socket_base_t::out_event (fd_t fd_) { xs_assert (false); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index bc405f6..8b2b216 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -86,8 +86,8 @@ namespace xs // i_poll_events implementation. This interface is used when socket // is handled by the poller in the reaper thread. - void in_event (); - void out_event (); + void in_event (fd_t fd_); + void out_event (fd_t fd_); void timer_event (int id_); // i_pipe_events interface implementation. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 0ae96d2..fb49152 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -124,7 +124,7 @@ void xs::stream_engine_t::plug (io_thread_t *io_thread_, set_pollout (handle); // Flush all the data that may have been already received downstream. - in_event (); + in_event (s); } void xs::stream_engine_t::unplug () @@ -151,7 +151,7 @@ void xs::stream_engine_t::terminate () delete this; } -void xs::stream_engine_t::in_event () +void xs::stream_engine_t::in_event (fd_t fd_) { bool disconnection = false; @@ -206,7 +206,7 @@ void xs::stream_engine_t::in_event () error (); } -void xs::stream_engine_t::out_event () +void xs::stream_engine_t::out_event (fd_t fd_) { bool more_data = true; @@ -260,7 +260,7 @@ void xs::stream_engine_t::activate_out () // was sent by the user the socket is probably available for writing. // Thus we try to write the data to socket avoiding polling for POLLOUT. // Consequently, the latency should be better in request/reply scenarios. - out_event (); + out_event (s); } void xs::stream_engine_t::activate_in () @@ -268,7 +268,7 @@ void xs::stream_engine_t::activate_in () set_pollin (handle); // Speculative read. - in_event (); + in_event (s); } void xs::stream_engine_t::error () diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 472c07a..f246552 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -56,8 +56,8 @@ namespace xs void activate_out (); // i_poll_events interface implementation. - void in_event (); - void out_event (); + void in_event (fd_t fd_); + void out_event (fd_t fd_); private: diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 36ddfa7..f5a9180 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -82,15 +82,15 @@ void xs::tcp_connecter_t::process_plug () start_connecting (); } -void xs::tcp_connecter_t::in_event () +void xs::tcp_connecter_t::in_event (fd_t fd_) { // We are not polling for incomming data, so we are actually called // because of error here. However, we can get error on out event as well // on some platforms, so we'll simply handle both events in the same way. - out_event (); + out_event (s); } -void xs::tcp_connecter_t::out_event () +void xs::tcp_connecter_t::out_event (fd_t fd_) { fd_t fd = connect (); rm_fd (handle); @@ -133,7 +133,7 @@ void xs::tcp_connecter_t::start_connecting () if (rc == 0) { handle = add_fd (s); handle_valid = true; - out_event (); + out_event (s); return; } diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index a174473..5791422 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -54,8 +54,8 @@ namespace xs void process_plug (); // Handlers for I/O events. - void in_event (); - void out_event (); + void in_event (fd_t fd_); + void out_event (fd_t fd_); void timer_event (int id_); // Internal function to start the actual connection establishment. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 2a2819c..31a9149 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -77,7 +77,7 @@ void xs::tcp_listener_t::process_term (int linger_) own_t::process_term (linger_); } -void xs::tcp_listener_t::in_event () +void xs::tcp_listener_t::in_event (fd_t fd_) { fd_t fd = accept (); diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 6c760e2..153f771 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -52,7 +52,7 @@ namespace xs void process_term (int linger_); // Handlers for I/O events. - void in_event (); + void in_event (fd_t fd_); // Close the listening socket. void close (); -- cgit v1.2.3