/*
Copyright (c) 2009-2012 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of Crossroads project.
Crossroads is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
Crossroads is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
#include "platform.hpp"
#if defined XS_HAVE_WINDOWS
#include "windows.hpp"
#else
#include
#include
#include
#include
#include
#include
#include
#endif
#include
#include
#include "stream_engine.hpp"
#include "poller_base.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
s (fd_),
inpos (NULL),
insize (0),
decoder (in_batch_size, options_.maxmsgsize),
outpos (NULL),
outsize (0),
encoder (out_batch_size),
session (NULL),
leftover_session (NULL),
options (options_),
plugged (false)
{
// Get the socket into non-blocking mode.
unblock_socket (s);
// Set the socket buffer limits for the underlying socket.
if (options.sndbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
(char*) &options.sndbuf, sizeof (int));
#ifdef XS_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
if (options.rcvbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
(char*) &options.rcvbuf, sizeof (int));
#ifdef XS_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
#if defined XS_HAVE_OSX || defined XS_HAVE_FREEBSD
// Make sure that SIGPIPE signal is not generated when writing to a
// connection that was already closed by the peer.
int set = 1;
int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
errno_assert (rc == 0);
#endif
}
xs::stream_engine_t::~stream_engine_t ()
{
xs_assert (!plugged);
if (s != retired_fd) {
#ifdef XS_HAVE_WINDOWS
int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = close (s);
errno_assert (rc == 0);
#endif
s = retired_fd;
}
}
void xs::stream_engine_t::plug (poller_base_t *io_thread_,
session_base_t *session_)
{
xs_assert (!plugged);
plugged = true;
leftover_session = NULL;
// Connect to session object.
xs_assert (!session);
xs_assert (session_);
encoder.set_session (session_);
decoder.set_session (session_);
session = session_;
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
handle = add_fd (s);
set_pollin (handle);
set_pollout (handle);
// Flush all the data that may have been already received downstream.
in_event (s);
}
void xs::stream_engine_t::unplug ()
{
xs_assert (plugged);
plugged = false;
// Cancel all fd subscriptions.
rm_fd (handle);
// Disconnect from I/O threads poller object.
io_object_t::unplug ();
// Disconnect from session object.
encoder.set_session (NULL);
decoder.set_session (NULL);
leftover_session = session;
session = NULL;
}
void xs::stream_engine_t::terminate ()
{
unplug ();
delete this;
}
void xs::stream_engine_t::in_event (fd_t fd_)
{
bool disconnection = false;
// If there's no data to process in the buffer...
if (!insize) {
// Retrieve the buffer and read as much data as possible.
// Note that buffer can be arbitrarily large. However, we assume
// the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited.
decoder.get_buffer (&inpos, &insize);
insize = read (inpos, insize);
// Check whether the peer has closed the connection.
if (insize == (size_t) -1) {
insize = 0;
disconnection = true;
}
}
// Push the data to the decoder.
size_t processed = decoder.process_buffer (inpos, insize);
if (unlikely (processed == (size_t) -1)) {
disconnection = true;
}
else {
// Stop polling for input if we got stuck.
if (processed < insize) {
// This may happen if queue limits are in effect.
if (plugged)
reset_pollin (handle);
}
// Adjust the buffer.
inpos += processed;
insize -= processed;
}
// Flush all messages the decoder may have produced.
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
xs_assert (leftover_session);
leftover_session->flush ();
} else {
session->flush ();
}
if (session && disconnection)
error ();
}
void xs::stream_engine_t::out_event (fd_t fd_)
{
bool more_data = true;
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {
outpos = NULL;
more_data = encoder.get_data (&outpos, &outsize);
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
xs_assert (leftover_session);
leftover_session->flush ();
return;
}
// If there is no data to send, stop polling for output.
if (outsize == 0) {
reset_pollout (handle);
return;
}
}
// If there are any data to write in write buffer, write as much as
// possible to the socket. Note that amount of data to write can be
// arbitratily large. However, we assume that underlying TCP layer has
// limited transmission buffer and thus the actual number of bytes
// written should be reasonably modest.
int nbytes = write (outpos, outsize);
// Handle problems with the connection.
if (nbytes == -1) {
error ();
return;
}
outpos += nbytes;
outsize -= nbytes;
// If the encoder reports that there are no more data to get from it
// we can stop polling for POLLOUT immediately.
if (!more_data)
reset_pollout (handle);
}
void xs::stream_engine_t::activate_out ()
{
set_pollout (handle);
// Speculative write: The assumption is that at the moment new message
// was sent by the user the socket is probably available for writing.
// Thus we try to write the data to socket avoiding polling for POLLOUT.
// Consequently, the latency should be better in request/reply scenarios.
out_event (s);
}
void xs::stream_engine_t::activate_in ()
{
set_pollin (handle);
// Speculative read.
in_event (s);
}
void xs::stream_engine_t::error ()
{
xs_assert (session);
session->detach ();
unplug ();
delete this;
}
int xs::stream_engine_t::write (const void *data_, size_t size_)
{
#ifdef XS_HAVE_WINDOWS
int nbytes = send (s, (char*) data_, (int) size_, 0);
// If not a single byte can be written to the socket in non-blocking mode
// we'll get an error (this may happen during the speculative write).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
return 0;
// Signalise peer failure.
if (nbytes == -1 && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAEHOSTUNREACH ||
WSAGetLastError () == WSAECONNABORTED ||
WSAGetLastError () == WSAETIMEDOUT ||
WSAGetLastError () == WSAECONNRESET))
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
return (size_t) nbytes;
#else
ssize_t nbytes = send (s, data_, size_, 0);
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
return -1;
errno_assert (nbytes != -1);
return (size_t) nbytes;
#endif
}
int xs::stream_engine_t::read (void *data_, size_t size_)
{
#ifdef XS_HAVE_WINDOWS
int nbytes = recv (s, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode
// we'll get an error (this may happen during the speculative read).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
return 0;
// Connection failure.
if (nbytes == -1 && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAECONNABORTED ||
WSAGetLastError () == WSAETIMEDOUT ||
WSAGetLastError () == WSAECONNRESET ||
WSAGetLastError () == WSAECONNREFUSED ||
WSAGetLastError () == WSAENOTCONN))
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
// Orderly shutdown by the other peer.
if (nbytes == 0)
return -1;
return (size_t) nbytes;
#else
ssize_t nbytes = recv (s, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not
// be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED ||
errno == ETIMEDOUT || errno == EHOSTUNREACH || errno == ENOTCONN))
return -1;
errno_assert (nbytes != -1);
// Orderly shutdown by the peer.
if (nbytes == 0)
return -1;
return (size_t) nbytes;
#endif
}