diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/connect_session.cpp | 20 | ||||
| -rw-r--r-- | src/i_engine.hpp | 4 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 12 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 7 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 12 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 7 | ||||
| -rw-r--r-- | src/session.cpp | 3 | ||||
| -rw-r--r-- | src/socket_base.cpp | 2 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 6 | ||||
| -rw-r--r-- | src/zmq_engine.hpp | 1 | 
10 files changed, 48 insertions, 26 deletions
diff --git a/src/connect_session.cpp b/src/connect_session.cpp index 896cc48..afa80b8 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -19,6 +19,8 @@  #include "connect_session.hpp"  #include "zmq_connecter.hpp" +#include "pgm_sender.hpp" +#include "pgm_receiver.hpp"  zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,        class socket_base_t *socket_, const options_t &options_, @@ -56,10 +58,10 @@ void zmq::connect_session_t::start_connecting ()  #if defined ZMQ_HAVE_OPENPGM      //  Both PGM and EPGM transports are using the same infrastructure. -    if (addr_type == "pgm" || addr_type == "epgm") { +    if (protocol == "pgm" || protocol == "epgm") {          //  For EPGM transport with UDP encapsulation of PGM is used. -        bool udp_encapsulation = (addr_type == "epgm"); +        bool udp_encapsulation = (protocol == "epgm");          //  At this point we'll create message pipes to the session straight          //  away. There's no point in delaying it as no concept of 'connect' @@ -71,11 +73,8 @@ void zmq::connect_session_t::start_connecting ()                  choose_io_thread (options.affinity), options);              zmq_assert (pgm_sender); -            int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); -            if (rc != 0) { -                delete pgm_sender; -                return -1; -            } +            int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); +            zmq_assert (rc == 0);              send_attach (this, pgm_sender, blob_t ());          } @@ -86,11 +85,8 @@ void zmq::connect_session_t::start_connecting ()                  choose_io_thread (options.affinity), options);              zmq_assert (pgm_receiver); -            int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); -            if (rc != 0) { -                delete pgm_receiver; -                return -1; -            } +            int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); +            zmq_assert (rc == 0);              send_attach (this, pgm_receiver, blob_t ());          } diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 0ba94f5..e104a9c 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -34,6 +34,10 @@ namespace zmq          //  Unplug the engine from the session.          virtual void unplug () = 0; +        //  Terminate and deallocate the engine. Note that 'detached' +        //  events in not fired on termination. +        virtual void terminate () = 0; +          //  This method is called by the session to signalise that more          //  messages can be written to the pipe.          virtual void activate_in () = 0; diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 048c529..ff61b96 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -55,7 +55,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)      return pgm_socket.init (udp_encapsulation_, network_);  } -void zmq::pgm_receiver_t::plug (i_inout *inout_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)  {      //  Retrieve PGM fds and start polling.      int socket_fd; @@ -88,12 +88,18 @@ void zmq::pgm_receiver_t::unplug ()      inout = NULL;  } -void zmq::pgm_receiver_t::revive () +void zmq::pgm_receiver_t::terminate () +{ +    unplug (); +    delete this; +} + +void zmq::pgm_receiver_t::activate_out ()  {      zmq_assert (false);  } -void zmq::pgm_receiver_t::resume_input () +void zmq::pgm_receiver_t::activate_in ()  {      //  It is possible that the most recently used decoder      //  processed the whole buffer but failed to write diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 1b367bf..7215324 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -51,10 +51,11 @@ namespace zmq          int init (bool udp_encapsulation_, const char *network_);          //  i_engine interface implementation. -        void plug (struct i_inout *inout_); +        void plug (class io_thread_t *io_thread_, struct i_inout *inout_);          void unplug (); -        void revive (); -        void resume_input (); +        void terminate (); +        void activate_in (); +        void activate_out ();          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 9aeb7a9..5c9020d 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -58,7 +58,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)      return rc;  } -void zmq::pgm_sender_t::plug (i_inout *inout_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)  {      //  Alocate 2 fds for PGM socket.      int downlink_socket_fd = 0; @@ -96,13 +96,19 @@ void zmq::pgm_sender_t::unplug ()      encoder.set_inout (NULL);  } -void zmq::pgm_sender_t::revive () +void zmq::pgm_sender_t::terminate () +{ +    unplug (); +    delete this; +} + +void zmq::pgm_sender_t::activate_out ()  {      set_pollout (handle);      out_event ();  } -void zmq::pgm_sender_t::resume_input () +void zmq::pgm_sender_t::activate_in ()  {      zmq_assert (false);  } diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 23a53bc..a1ac329 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -49,10 +49,11 @@ namespace zmq          int init (bool udp_encapsulation_, const char *network_);          //  i_engine interface implementation. -        void plug (struct i_inout *inout_); +        void plug (class io_thread_t *io_thread_, struct i_inout *inout_);          void unplug (); -        void revive (); -        void resume_input (); +        void terminate (); +        void activate_in (); +        void activate_out ();          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/session.cpp b/src/session.cpp index e208ebf..9655e64 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -45,6 +45,9 @@ zmq::session_t::~session_t ()  {      zmq_assert (!in_pipe);      zmq_assert (!out_pipe); + +    if (engine) +        engine->terminate ();  }  void zmq::session_t::terminate () diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 060480f..0103618 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -44,8 +44,6 @@  #include "err.hpp"  #include "ctx.hpp"  #include "platform.hpp" -#include "pgm_sender.hpp" -#include "pgm_receiver.hpp"  #include "likely.hpp"  #include "pair.hpp"  #include "pub.hpp" diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index de26b27..6551bc3 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -87,6 +87,12 @@ void zmq::zmq_engine_t::unplug ()      inout = NULL;  } +void zmq::zmq_engine_t::terminate () +{ +    unplug (); +    delete this; +} +  void zmq::zmq_engine_t::in_event ()  {      bool disconnection = false; diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 328ec95..1023051 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -44,6 +44,7 @@ namespace zmq          //  i_engine interface implementation.          void plug (class io_thread_t *io_thread_, struct i_inout *inout_);          void unplug (); +        void terminate ();          void activate_in ();          void activate_out ();  | 
