summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-14 08:37:38 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commit43e34d028115c43577713c0c3e1f0c33b0aac94a (patch)
tree05f1831e132a40a7f4a58d65b948786bee1571f1
parent45f83d78a56f4b3a812c87fec03a75558445b2ab (diff)
engine leak fixed; pgm compilation fixed
-rw-r--r--src/connect_session.cpp20
-rw-r--r--src/i_engine.hpp4
-rw-r--r--src/pgm_receiver.cpp12
-rw-r--r--src/pgm_receiver.hpp7
-rw-r--r--src/pgm_sender.cpp12
-rw-r--r--src/pgm_sender.hpp7
-rw-r--r--src/session.cpp3
-rw-r--r--src/socket_base.cpp2
-rw-r--r--src/zmq_engine.cpp6
-rw-r--r--src/zmq_engine.hpp1
10 files changed, 48 insertions, 26 deletions
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
index 896cc48..afa80b8 100644
--- a/src/connect_session.cpp
+++ b/src/connect_session.cpp
@@ -19,6 +19,8 @@
#include "connect_session.hpp"
#include "zmq_connecter.hpp"
+#include "pgm_sender.hpp"
+#include "pgm_receiver.hpp"
zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_,
@@ -56,10 +58,10 @@ void zmq::connect_session_t::start_connecting ()
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
- if (addr_type == "pgm" || addr_type == "epgm") {
+ if (protocol == "pgm" || protocol == "epgm") {
// For EPGM transport with UDP encapsulation of PGM is used.
- bool udp_encapsulation = (addr_type == "epgm");
+ bool udp_encapsulation = (protocol == "epgm");
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
@@ -71,11 +73,8 @@ void zmq::connect_session_t::start_connecting ()
choose_io_thread (options.affinity), options);
zmq_assert (pgm_sender);
- int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
- if (rc != 0) {
- delete pgm_sender;
- return -1;
- }
+ int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
+ zmq_assert (rc == 0);
send_attach (this, pgm_sender, blob_t ());
}
@@ -86,11 +85,8 @@ void zmq::connect_session_t::start_connecting ()
choose_io_thread (options.affinity), options);
zmq_assert (pgm_receiver);
- int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
- if (rc != 0) {
- delete pgm_receiver;
- return -1;
- }
+ int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
+ zmq_assert (rc == 0);
send_attach (this, pgm_receiver, blob_t ());
}
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 0ba94f5..e104a9c 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -34,6 +34,10 @@ namespace zmq
// Unplug the engine from the session.
virtual void unplug () = 0;
+ // Terminate and deallocate the engine. Note that 'detached'
+ // events in not fired on termination.
+ virtual void terminate () = 0;
+
// This method is called by the session to signalise that more
// messages can be written to the pipe.
virtual void activate_in () = 0;
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 048c529..ff61b96 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -55,7 +55,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_);
}
-void zmq::pgm_receiver_t::plug (i_inout *inout_)
+void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
// Retrieve PGM fds and start polling.
int socket_fd;
@@ -88,12 +88,18 @@ void zmq::pgm_receiver_t::unplug ()
inout = NULL;
}
-void zmq::pgm_receiver_t::revive ()
+void zmq::pgm_receiver_t::terminate ()
+{
+ unplug ();
+ delete this;
+}
+
+void zmq::pgm_receiver_t::activate_out ()
{
zmq_assert (false);
}
-void zmq::pgm_receiver_t::resume_input ()
+void zmq::pgm_receiver_t::activate_in ()
{
// It is possible that the most recently used decoder
// processed the whole buffer but failed to write
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 1b367bf..7215324 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -51,10 +51,11 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
- void plug (struct i_inout *inout_);
+ void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
void unplug ();
- void revive ();
- void resume_input ();
+ void terminate ();
+ void activate_in ();
+ void activate_out ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 9aeb7a9..5c9020d 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -58,7 +58,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc;
}
-void zmq::pgm_sender_t::plug (i_inout *inout_)
+void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
// Alocate 2 fds for PGM socket.
int downlink_socket_fd = 0;
@@ -96,13 +96,19 @@ void zmq::pgm_sender_t::unplug ()
encoder.set_inout (NULL);
}
-void zmq::pgm_sender_t::revive ()
+void zmq::pgm_sender_t::terminate ()
+{
+ unplug ();
+ delete this;
+}
+
+void zmq::pgm_sender_t::activate_out ()
{
set_pollout (handle);
out_event ();
}
-void zmq::pgm_sender_t::resume_input ()
+void zmq::pgm_sender_t::activate_in ()
{
zmq_assert (false);
}
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 23a53bc..a1ac329 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -49,10 +49,11 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
- void plug (struct i_inout *inout_);
+ void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
void unplug ();
- void revive ();
- void resume_input ();
+ void terminate ();
+ void activate_in ();
+ void activate_out ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/session.cpp b/src/session.cpp
index e208ebf..9655e64 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -45,6 +45,9 @@ zmq::session_t::~session_t ()
{
zmq_assert (!in_pipe);
zmq_assert (!out_pipe);
+
+ if (engine)
+ engine->terminate ();
}
void zmq::session_t::terminate ()
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 060480f..0103618 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -44,8 +44,6 @@
#include "err.hpp"
#include "ctx.hpp"
#include "platform.hpp"
-#include "pgm_sender.hpp"
-#include "pgm_receiver.hpp"
#include "likely.hpp"
#include "pair.hpp"
#include "pub.hpp"
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index de26b27..6551bc3 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -87,6 +87,12 @@ void zmq::zmq_engine_t::unplug ()
inout = NULL;
}
+void zmq::zmq_engine_t::terminate ()
+{
+ unplug ();
+ delete this;
+}
+
void zmq::zmq_engine_t::in_event ()
{
bool disconnection = false;
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 328ec95..1023051 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -44,6 +44,7 @@ namespace zmq
// i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
void unplug ();
+ void terminate ();
void activate_in ();
void activate_out ();