summaryrefslogtreecommitdiff
path: root/src/stream_engine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream_engine.cpp')
-rw-r--r--src/stream_engine.cpp62
1 files changed, 31 insertions, 31 deletions
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index ab6329a..0034737 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -1,16 +1,16 @@
/*
- Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2009-2012 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
- This file is part of 0MQ.
+ This file is part of Crossroads project.
- 0MQ is free software; you can redistribute it and/or modify it under
+ Crossroads is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
- 0MQ is distributed in the hope that it will be useful,
+ Crossroads is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
@@ -20,7 +20,7 @@
*/
#include "platform.hpp"
-#if defined ZMQ_HAVE_WINDOWS
+#if defined XS_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
@@ -42,7 +42,7 @@
#include "err.hpp"
#include "ip.hpp"
-zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
+xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
s (fd_),
inpos (NULL),
insize (0),
@@ -62,7 +62,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
if (options.sndbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
(char*) &options.sndbuf, sizeof (int));
-#ifdef ZMQ_HAVE_WINDOWS
+#ifdef XS_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
@@ -71,14 +71,14 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
if (options.rcvbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
(char*) &options.rcvbuf, sizeof (int));
-#ifdef ZMQ_HAVE_WINDOWS
+#ifdef XS_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
-#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
+#if defined XS_HAVE_OSX || defined XS_HAVE_FREEBSD
// Make sure that SIGPIPE signal is not generated when writing to a
// connection that was already closed by the peer.
int set = 1;
@@ -87,12 +87,12 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
#endif
}
-zmq::stream_engine_t::~stream_engine_t ()
+xs::stream_engine_t::~stream_engine_t ()
{
- zmq_assert (!plugged);
+ xs_assert (!plugged);
if (s != retired_fd) {
-#ifdef ZMQ_HAVE_WINDOWS
+#ifdef XS_HAVE_WINDOWS
int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
#else
@@ -103,16 +103,16 @@ zmq::stream_engine_t::~stream_engine_t ()
}
}
-void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
+void xs::stream_engine_t::plug (io_thread_t *io_thread_,
session_base_t *session_)
{
- zmq_assert (!plugged);
+ xs_assert (!plugged);
plugged = true;
leftover_session = NULL;
// Connect to session object.
- zmq_assert (!session);
- zmq_assert (session_);
+ xs_assert (!session);
+ xs_assert (session_);
encoder.set_session (session_);
decoder.set_session (session_);
session = session_;
@@ -127,9 +127,9 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
in_event ();
}
-void zmq::stream_engine_t::unplug ()
+void xs::stream_engine_t::unplug ()
{
- zmq_assert (plugged);
+ xs_assert (plugged);
plugged = false;
// Cancel all fd subscriptions.
@@ -145,13 +145,13 @@ void zmq::stream_engine_t::unplug ()
session = NULL;
}
-void zmq::stream_engine_t::terminate ()
+void xs::stream_engine_t::terminate ()
{
unplug ();
delete this;
}
-void zmq::stream_engine_t::in_event ()
+void xs::stream_engine_t::in_event ()
{
bool disconnection = false;
@@ -196,7 +196,7 @@ void zmq::stream_engine_t::in_event ()
// Flush all messages the decoder may have produced.
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
- zmq_assert (leftover_session);
+ xs_assert (leftover_session);
leftover_session->flush ();
} else {
session->flush ();
@@ -206,7 +206,7 @@ void zmq::stream_engine_t::in_event ()
error ();
}
-void zmq::stream_engine_t::out_event ()
+void xs::stream_engine_t::out_event ()
{
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {
@@ -216,7 +216,7 @@ void zmq::stream_engine_t::out_event ()
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
- zmq_assert (leftover_session);
+ xs_assert (leftover_session);
leftover_session->flush ();
return;
}
@@ -245,7 +245,7 @@ void zmq::stream_engine_t::out_event ()
outsize -= nbytes;
}
-void zmq::stream_engine_t::activate_out ()
+void xs::stream_engine_t::activate_out ()
{
set_pollout (handle);
@@ -256,7 +256,7 @@ void zmq::stream_engine_t::activate_out ()
out_event ();
}
-void zmq::stream_engine_t::activate_in ()
+void xs::stream_engine_t::activate_in ()
{
set_pollin (handle);
@@ -264,17 +264,17 @@ void zmq::stream_engine_t::activate_in ()
in_event ();
}
-void zmq::stream_engine_t::error ()
+void xs::stream_engine_t::error ()
{
- zmq_assert (session);
+ xs_assert (session);
session->detach ();
unplug ();
delete this;
}
-int zmq::stream_engine_t::write (const void *data_, size_t size_)
+int xs::stream_engine_t::write (const void *data_, size_t size_)
{
-#ifdef ZMQ_HAVE_WINDOWS
+#ifdef XS_HAVE_WINDOWS
int nbytes = send (s, (char*) data_, (int) size_, 0);
@@ -317,9 +317,9 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_)
#endif
}
-int zmq::stream_engine_t::read (void *data_, size_t size_)
+int xs::stream_engine_t::read (void *data_, size_t size_)
{
-#ifdef ZMQ_HAVE_WINDOWS
+#ifdef XS_HAVE_WINDOWS
int nbytes = recv (s, (char*) data_, (int) size_, 0);