diff options
| -rw-r--r-- | src/pipe.cpp | 38 | 
1 files changed, 25 insertions, 13 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 26f7d85..20c7f69 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -165,10 +165,12 @@ void zmq::pipe_t::rollback ()  {      //  Remove incomplete message from the outbound pipe.      msg_t msg; -    while (outpipe->unwrite (&msg)) { -        zmq_assert (msg.flags () & msg_t::more); -        int rc = msg.close (); -        errno_assert (rc == 0); +    if (outpipe) { +		while (outpipe->unwrite (&msg)) { +		    zmq_assert (msg.flags () & msg_t::more); +		    int rc = msg.close (); +		    errno_assert (rc == 0); +		}      }  } @@ -238,6 +240,7 @@ void zmq::pipe_t::process_pipe_term ()      if (state == active) {          if (!delay) {              state = terminating; +            outpipe = NULL;              send_pipe_term_ack (peer);              return;          } @@ -251,6 +254,7 @@ void zmq::pipe_t::process_pipe_term ()      //  term command as well, so we can move straight to terminating state.      if (state == delimited) {          state = terminating; +        outpipe = NULL;          send_pipe_term_ack (peer);          return;      } @@ -260,6 +264,7 @@ void zmq::pipe_t::process_pipe_term ()      //  own ack.      if (state == terminated) {          state = double_terminated; +        outpipe = NULL;          send_pipe_term_ack (peer);          return;      } @@ -280,8 +285,10 @@ void zmq::pipe_t::process_pipe_term_ack ()      //  are invalid.      if (state == terminating) ;      else if (state == double_terminated); -    else if (state == terminated) +    else if (state == terminated) { +        outpipe = NULL;          send_pipe_term_ack (peer); +    }      else          zmq_assert (false); @@ -325,6 +332,7 @@ void zmq::pipe_t::terminate (bool delay_)      //  There are still pending messages available, but the user calls      //  'terminate'. We can act as if all the pending messages were read.      else if (state == pending && !delay) { +            outpipe = NULL;              send_pipe_term_ack (peer);              state = terminating;      } @@ -348,15 +356,18 @@ void zmq::pipe_t::terminate (bool delay_)      //  Stop outbound flow of messages.      out_active = false; -    //  Rollback any unfinished outbound messages. -    rollback (); +    if (outpipe) { -    //  Push delimiter into the outbound pipe. Note that watermarks are not -    //  checked thus the delimiter can be written even though the pipe is full. -    msg_t msg; -    msg.init_delimiter (); -    outpipe->write (msg, false); -    flush (); +		//  Rollback any unfinished outbound messages. +		rollback (); + +		//  Push delimiter into the outbound pipe. Note that watermarks are not +		//  checked thus the delimiter can be written even though the pipe is full. +		msg_t msg; +		msg.init_delimiter (); +		outpipe->write (msg, false); +		flush (); +    }  }  bool zmq::pipe_t::is_delimiter (msg_t &msg_) @@ -400,6 +411,7 @@ void zmq::pipe_t::delimit ()      }      if (state == pending) { +        outpipe = NULL;          send_pipe_term_ack (peer);          state = terminating;          return;  | 
