diff options
Diffstat (limited to 'src/session_base.cpp')
-rw-r--r-- | src/session_base.cpp | 124 |
1 files changed, 62 insertions, 62 deletions
diff --git a/src/session_base.cpp b/src/session_base.cpp index f2ee713..6c23bb5 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -4,14 +4,14 @@ Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - This file is part of 0MQ. + This file is part of Crossroads project. - 0MQ is free software; you can redistribute it and/or modify it under + Crossroads is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. - 0MQ is distributed in the hope that it will be useful, + Crossroads is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. @@ -43,52 +43,52 @@ #include "pull.hpp" #include "pair.hpp" -zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, +xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_, bool connect_, class socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) { session_base_t *s = NULL; switch (options_.type) { - case ZMQ_REQ: + case XS_REQ: s = new (std::nothrow) req_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_XREQ: + case XS_XREQ: s = new (std::nothrow) xreq_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); - case ZMQ_REP: + case XS_REP: s = new (std::nothrow) rep_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_XREP: + case XS_XREP: s = new (std::nothrow) xrep_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_PUB: + case XS_PUB: s = new (std::nothrow) pub_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_XPUB: + case XS_XPUB: s = new (std::nothrow) xpub_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_SUB: + case XS_SUB: s = new (std::nothrow) sub_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_XSUB: + case XS_XSUB: s = new (std::nothrow) xsub_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_PUSH: + case XS_PUSH: s = new (std::nothrow) push_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_PULL: + case XS_PULL: s = new (std::nothrow) pull_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_PAIR: + case XS_PAIR: s = new (std::nothrow) pair_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; @@ -100,7 +100,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, return s; } -zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, +xs::session_base_t::session_base_t (class io_thread_t *io_thread_, bool connect_, class socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : own_t (io_thread_, options_), @@ -122,9 +122,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, address = address_; } -zmq::session_base_t::~session_base_t () +xs::session_base_t::~session_base_t () { - zmq_assert (!pipe); + xs_assert (!pipe); // If there's still a pending linger timer, remove it. if (has_linger_timer) { @@ -137,20 +137,20 @@ zmq::session_base_t::~session_base_t () engine->terminate (); } -void zmq::session_base_t::attach_pipe (pipe_t *pipe_) +void xs::session_base_t::attach_pipe (pipe_t *pipe_) { - zmq_assert (!is_terminating ()); - zmq_assert (!pipe); - zmq_assert (pipe_); + xs_assert (!is_terminating ()); + xs_assert (!pipe); + xs_assert (pipe_); pipe = pipe_; pipe->set_event_sink (this); } -int zmq::session_base_t::read (msg_t *msg_) +int xs::session_base_t::read (msg_t *msg_) { // First message to send is identity (if required). if (send_identity) { - zmq_assert (!(msg_->flags () & msg_t::more)); + xs_assert (!(msg_->flags () & msg_t::more)); msg_->init_size (options.identity_size); memcpy (msg_->data (), options.identity, options.identity_size); send_identity = false; @@ -167,7 +167,7 @@ int zmq::session_base_t::read (msg_t *msg_) return 0; } -int zmq::session_base_t::write (msg_t *msg_) +int xs::session_base_t::write (msg_t *msg_) { // First message to receive is identity (if required). if (recv_identity) { @@ -185,13 +185,13 @@ int zmq::session_base_t::write (msg_t *msg_) return -1; } -void zmq::session_base_t::flush () +void xs::session_base_t::flush () { if (pipe) pipe->flush (); } -void zmq::session_base_t::clean_pipes () +void xs::session_base_t::clean_pipes () { if (pipe) { @@ -206,7 +206,7 @@ void zmq::session_base_t::clean_pipes () int rc = msg.init (); errno_assert (rc == 0); if (!read (&msg)) { - zmq_assert (!incomplete_in); + xs_assert (!incomplete_in); break; } rc = msg.close (); @@ -215,10 +215,10 @@ void zmq::session_base_t::clean_pipes () } } -void zmq::session_base_t::terminated (pipe_t *pipe_) +void xs::session_base_t::terminated (pipe_t *pipe_) { // Drop the reference to the deallocated pipe. - zmq_assert (pipe == pipe_); + xs_assert (pipe == pipe_); pipe = NULL; // If we are waiting for pending messages to be sent, at this point @@ -228,9 +228,9 @@ void zmq::session_base_t::terminated (pipe_t *pipe_) proceed_with_term (); } -void zmq::session_base_t::read_activated (pipe_t *pipe_) +void xs::session_base_t::read_activated (pipe_t *pipe_) { - zmq_assert (pipe == pipe_); + xs_assert (pipe == pipe_); if (likely (engine != NULL)) engine->activate_out (); @@ -238,33 +238,33 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) pipe->check_read (); } -void zmq::session_base_t::write_activated (pipe_t *pipe_) +void xs::session_base_t::write_activated (pipe_t *pipe_) { - zmq_assert (pipe == pipe_); + xs_assert (pipe == pipe_); if (engine) engine->activate_in (); } -void zmq::session_base_t::hiccuped (pipe_t *pipe_) +void xs::session_base_t::hiccuped (pipe_t *pipe_) { // Hiccups are always sent from session to socket, not the other // way round. - zmq_assert (false); + xs_assert (false); } -void zmq::session_base_t::process_plug () +void xs::session_base_t::process_plug () { if (connect) start_connecting (false); } -void zmq::session_base_t::process_attach (i_engine *engine_) +void xs::session_base_t::process_attach (i_engine *engine_) { // If some other object (e.g. init) notifies us that the connection failed // without creating an engine we need to start the reconnection process. if (!engine_) { - zmq_assert (!engine); + xs_assert (!engine); detached (); return; } @@ -282,7 +282,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) pipes [0]->set_event_sink (this); // Remember the local end of the pipe. - zmq_assert (!pipe); + xs_assert (!pipe); pipe = pipes [0]; // Ask socket to plug into the remote end of the pipe. @@ -290,12 +290,12 @@ void zmq::session_base_t::process_attach (i_engine *engine_) } // Plug in the engine. - zmq_assert (!engine); + xs_assert (!engine); engine = engine_; engine->plug (io_thread, this); } -void zmq::session_base_t::detach () +void xs::session_base_t::detach () { // Engine is dead. Let's forget about it. engine = NULL; @@ -311,9 +311,9 @@ void zmq::session_base_t::detach () pipe->check_read (); } -void zmq::session_base_t::process_term (int linger_) +void xs::session_base_t::process_term (int linger_) { - zmq_assert (!pending); + xs_assert (!pending); // If the termination of the pipe happens before the term command is // delivered there's nothing much to do. We can proceed with the @@ -329,7 +329,7 @@ void zmq::session_base_t::process_term (int linger_) // If linger is infinite (negative) we don't even have to set // the timer. if (linger_ > 0) { - zmq_assert (!has_linger_timer); + xs_assert (!has_linger_timer); add_timer (linger_, linger_timer_id); has_linger_timer = true; } @@ -344,7 +344,7 @@ void zmq::session_base_t::process_term (int linger_) pipe->check_read (); } -void zmq::session_base_t::proceed_with_term () +void xs::session_base_t::proceed_with_term () { // The pending phase have just ended. pending = false; @@ -353,19 +353,19 @@ void zmq::session_base_t::proceed_with_term () own_t::process_term (0); } -void zmq::session_base_t::timer_event (int id_) +void xs::session_base_t::timer_event (int id_) { // Linger period expired. We can proceed with termination even though // there are still pending messages to be sent. - zmq_assert (id_ == linger_timer_id); + xs_assert (id_ == linger_timer_id); has_linger_timer = false; // Ask pipe to terminate even though there may be pending messages in it. - zmq_assert (pipe); + xs_assert (pipe); pipe->terminate (false); } -void zmq::session_base_t::detached () +void xs::session_base_t::detached () { // Transient session self-destructs after peer disconnects. if (!connect) { @@ -378,18 +378,18 @@ void zmq::session_base_t::detached () // For subscriber sockets we hiccup the inbound pipe, which will cause // the socket object to resend all the subscriptions. - if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) + if (pipe && (options.type == XS_SUB || options.type == XS_XSUB)) pipe->hiccup (); } -void zmq::session_base_t::start_connecting (bool wait_) +void xs::session_base_t::start_connecting (bool wait_) { - zmq_assert (connect); + xs_assert (connect); // Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); + xs_assert (io_thread); // Create the connecter object. @@ -401,7 +401,7 @@ void zmq::session_base_t::start_connecting (bool wait_) return; } -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS if (protocol == "ipc") { ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t ( io_thread, this, options, address.c_str (), wait_); @@ -411,7 +411,7 @@ void zmq::session_base_t::start_connecting (bool wait_) } #endif -#if defined ZMQ_HAVE_OPENPGM +#if defined XS_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. if (protocol == "pgm" || protocol == "epgm") { @@ -422,7 +422,7 @@ void zmq::session_base_t::start_connecting (bool wait_) // At this point we'll create message pipes to the session straight // away. There's no point in delaying it as no concept of 'connect' // exists with PGM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + if (options.type == XS_PUB || options.type == XS_XPUB) { // PGM sender. pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( @@ -430,11 +430,11 @@ void zmq::session_base_t::start_connecting (bool wait_) alloc_assert (pgm_sender); int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); - zmq_assert (rc == 0); + xs_assert (rc == 0); send_attach (this, pgm_sender); } - else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { + else if (options.type == XS_SUB || options.type == XS_XSUB) { // PGM receiver. pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( @@ -442,17 +442,17 @@ void zmq::session_base_t::start_connecting (bool wait_) alloc_assert (pgm_receiver); int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); - zmq_assert (rc == 0); + xs_assert (rc == 0); send_attach (this, pgm_receiver); } else - zmq_assert (false); + xs_assert (false); return; } #endif - zmq_assert (false); + xs_assert (false); } |