diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/pub.cpp | 34 | ||||
| -rw-r--r-- | src/pub.hpp | 7 | 
2 files changed, 33 insertions, 8 deletions
diff --git a/src/pub.cpp b/src/pub.cpp index b6802fd..2971e5c 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -25,7 +25,8 @@  #include "pipe.hpp"  zmq::pub_t::pub_t (class app_thread_t *parent_) : -    socket_base_t (parent_) +    socket_base_t (parent_), +    stalled_pipe (NULL)  {      options.requires_in = false;      options.requires_out = true; @@ -53,6 +54,8 @@ void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)  void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)  {      out_pipes.erase (pipe_); +    if (pipe_ == stalled_pipe) +        stalled_pipe = NULL;  }  void zmq::pub_t::xkill (class reader_t *pipe_) @@ -67,7 +70,8 @@ void zmq::pub_t::xrevive (class reader_t *pipe_)  void zmq::pub_t::xrevive (class writer_t *pipe_)  { -    zmq_not_implemented (); +    zmq_assert (stalled_pipe = pipe_); +    stalled_pipe = NULL;  }  int zmq::pub_t::xsetsockopt (int option_, const void *optval_, @@ -91,11 +95,10 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)      }      //  First check whether all pipes are available for writing. -    for (out_pipes_t::size_type i = 0; i != pipes_count; i++) -        if (!out_pipes [i]->check_write ()) { -            errno = EAGAIN; -            return -1; -        } +    if (!check_write ()) { +        errno = EAGAIN; +        return -1; +    }      msg_content_t *content = (msg_content_t*) msg_->content; @@ -171,7 +174,22 @@ bool zmq::pub_t::xhas_in ()  bool zmq::pub_t::xhas_out ()  { -    //  TODO: Reimplement when queue limits are added. +    return check_write (); +} + +bool zmq::pub_t::check_write () +{ +    if (stalled_pipe != NULL) +        return false; + +    out_pipes_t::size_type pipes_num = out_pipes.size (); +    for (out_pipes_t::size_type i = 0; i < pipes_num; i++) { +        if (!out_pipes [i]->check_write ()) { +            stalled_pipe = out_pipes [i]; +            return false; +        } +    } +      return true;  } diff --git a/src/pub.hpp b/src/pub.hpp index a85301f..3a4fe09 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -54,6 +54,13 @@ namespace zmq          typedef yarray_t <class writer_t> out_pipes_t;          out_pipes_t out_pipes; +        //  Pointer to the pipe we are waiting for to became writable +        //  again; NULL if tha last send operation was successful. +        class writer_t *stalled_pipe; + +        //  Check whether we can write a message to all pipes. +        bool check_write (); +          pub_t (const pub_t&);          void operator = (const pub_t&);      };  | 
