diff options
Diffstat (limited to 'src/stream_engine.cpp')
-rw-r--r-- | src/stream_engine.cpp | 62 |
1 files changed, 31 insertions, 31 deletions
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index ab6329a..0034737 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -1,16 +1,16 @@ /* - Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2009-2012 250bpm s.r.o. Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - This file is part of 0MQ. + This file is part of Crossroads project. - 0MQ is free software; you can redistribute it and/or modify it under + Crossroads is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. - 0MQ is distributed in the hope that it will be useful, + Crossroads is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. @@ -20,7 +20,7 @@ */ #include "platform.hpp" -#if defined ZMQ_HAVE_WINDOWS +#if defined XS_HAVE_WINDOWS #include "windows.hpp" #else #include <unistd.h> @@ -42,7 +42,7 @@ #include "err.hpp" #include "ip.hpp" -zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : +xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : s (fd_), inpos (NULL), insize (0), @@ -62,7 +62,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : if (options.sndbuf) { int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, (char*) &options.sndbuf, sizeof (int)); -#ifdef ZMQ_HAVE_WINDOWS +#ifdef XS_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else errno_assert (rc == 0); @@ -71,14 +71,14 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : if (options.rcvbuf) { int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, (char*) &options.rcvbuf, sizeof (int)); -#ifdef ZMQ_HAVE_WINDOWS +#ifdef XS_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else errno_assert (rc == 0); #endif } -#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD +#if defined XS_HAVE_OSX || defined XS_HAVE_FREEBSD // Make sure that SIGPIPE signal is not generated when writing to a // connection that was already closed by the peer. int set = 1; @@ -87,12 +87,12 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : #endif } -zmq::stream_engine_t::~stream_engine_t () +xs::stream_engine_t::~stream_engine_t () { - zmq_assert (!plugged); + xs_assert (!plugged); if (s != retired_fd) { -#ifdef ZMQ_HAVE_WINDOWS +#ifdef XS_HAVE_WINDOWS int rc = closesocket (s); wsa_assert (rc != SOCKET_ERROR); #else @@ -103,16 +103,16 @@ zmq::stream_engine_t::~stream_engine_t () } } -void zmq::stream_engine_t::plug (io_thread_t *io_thread_, +void xs::stream_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) { - zmq_assert (!plugged); + xs_assert (!plugged); plugged = true; leftover_session = NULL; // Connect to session object. - zmq_assert (!session); - zmq_assert (session_); + xs_assert (!session); + xs_assert (session_); encoder.set_session (session_); decoder.set_session (session_); session = session_; @@ -127,9 +127,9 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, in_event (); } -void zmq::stream_engine_t::unplug () +void xs::stream_engine_t::unplug () { - zmq_assert (plugged); + xs_assert (plugged); plugged = false; // Cancel all fd subscriptions. @@ -145,13 +145,13 @@ void zmq::stream_engine_t::unplug () session = NULL; } -void zmq::stream_engine_t::terminate () +void xs::stream_engine_t::terminate () { unplug (); delete this; } -void zmq::stream_engine_t::in_event () +void xs::stream_engine_t::in_event () { bool disconnection = false; @@ -196,7 +196,7 @@ void zmq::stream_engine_t::in_event () // Flush all messages the decoder may have produced. // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (leftover_session); + xs_assert (leftover_session); leftover_session->flush (); } else { session->flush (); @@ -206,7 +206,7 @@ void zmq::stream_engine_t::in_event () error (); } -void zmq::stream_engine_t::out_event () +void xs::stream_engine_t::out_event () { // If write buffer is empty, try to read new data from the encoder. if (!outsize) { @@ -216,7 +216,7 @@ void zmq::stream_engine_t::out_event () // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (leftover_session); + xs_assert (leftover_session); leftover_session->flush (); return; } @@ -245,7 +245,7 @@ void zmq::stream_engine_t::out_event () outsize -= nbytes; } -void zmq::stream_engine_t::activate_out () +void xs::stream_engine_t::activate_out () { set_pollout (handle); @@ -256,7 +256,7 @@ void zmq::stream_engine_t::activate_out () out_event (); } -void zmq::stream_engine_t::activate_in () +void xs::stream_engine_t::activate_in () { set_pollin (handle); @@ -264,17 +264,17 @@ void zmq::stream_engine_t::activate_in () in_event (); } -void zmq::stream_engine_t::error () +void xs::stream_engine_t::error () { - zmq_assert (session); + xs_assert (session); session->detach (); unplug (); delete this; } -int zmq::stream_engine_t::write (const void *data_, size_t size_) +int xs::stream_engine_t::write (const void *data_, size_t size_) { -#ifdef ZMQ_HAVE_WINDOWS +#ifdef XS_HAVE_WINDOWS int nbytes = send (s, (char*) data_, (int) size_, 0); @@ -317,9 +317,9 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_) #endif } -int zmq::stream_engine_t::read (void *data_, size_t size_) +int xs::stream_engine_t::read (void *data_, size_t size_) { -#ifdef ZMQ_HAVE_WINDOWS +#ifdef XS_HAVE_WINDOWS int nbytes = recv (s, (char*) data_, (int) size_, 0); |