diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ctx.cpp | 9 | ||||
| -rw-r--r-- | src/ctx.hpp | 6 | ||||
| -rw-r--r-- | src/decoder.cpp | 4 | ||||
| -rw-r--r-- | src/dist.cpp | 72 | ||||
| -rw-r--r-- | src/dist.hpp | 20 | ||||
| -rw-r--r-- | src/err.cpp | 112 | ||||
| -rw-r--r-- | src/err.hpp | 12 | ||||
| -rw-r--r-- | src/rep.cpp | 21 | ||||
| -rw-r--r-- | src/socket_base.cpp | 9 | ||||
| -rw-r--r-- | src/socket_base.hpp | 6 | ||||
| -rw-r--r-- | src/tcp_connecter.cpp | 9 | ||||
| -rw-r--r-- | src/xrep.cpp | 18 | ||||
| -rw-r--r-- | src/xrep.hpp | 1 | ||||
| -rw-r--r-- | src/zmq.cpp | 32 | 
14 files changed, 201 insertions, 130 deletions
| diff --git a/src/ctx.cpp b/src/ctx.cpp index 9cbb9de..2758729 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -36,6 +36,7 @@  #endif  zmq::ctx_t::ctx_t (uint32_t io_threads_) : +    tag (0xbadcafe0),      terminating (false)  {      int rc; @@ -78,6 +79,11 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :      zmq_assert (rc == 0);  } +bool zmq::ctx_t::check_tag () +{ +    return tag == 0xbadcafe0; +} +  zmq::ctx_t::~ctx_t ()  {      //  Check that there are no remaining sockets. @@ -99,6 +105,9 @@ zmq::ctx_t::~ctx_t ()      //  needed as mailboxes themselves were deallocated with their      //  corresponding io_thread/socket objects.      free (slots); + +    //  Remove the tag, so that the object is considered dead. +    tag = 0xdeadbeef;  }  int zmq::ctx_t::terminate () diff --git a/src/ctx.hpp b/src/ctx.hpp index c6ea4ce..33d5dad 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -60,6 +60,9 @@ namespace zmq          //  of I/O thread pool to create.          ctx_t (uint32_t io_threads_); +        //  Returns false if object is not a context. +        bool check_tag (); +          //  This function is called when user invokes zmq_term. If there are          //  no more sockets open it'll cause all the infrastructure to be shut          //  down. If there are open sockets still, the deallocation happens @@ -98,6 +101,9 @@ namespace zmq          ~ctx_t (); +        //  Used to check whether the object is a context. +        uint32_t tag; +          //  Sockets belonging to this context. We need the list so that          //  we can notify the sockets when zmq_term() is called. The sockets          //  will return ETERM then. diff --git a/src/decoder.cpp b/src/decoder.cpp index c9a7dc9..84ecd92 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -109,11 +109,11 @@ bool zmq::decoder_t::eight_byte_size_ready ()  bool zmq::decoder_t::flags_ready ()  {      //  Store the flags from the wire into the message structure. -    in_progress.flags = tmpbuf [0]; +    in_progress.flags = tmpbuf [0] | ~ZMQ_MSG_MASK;      next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),          &decoder_t::message_ready); -     +      return true;  } diff --git a/src/dist.cpp b/src/dist.cpp index e447bc1..d74e69f 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -25,9 +25,11 @@  #include "err.hpp"  #include "own.hpp"  #include "msg_content.hpp" +#include "likely.hpp"  zmq::dist_t::dist_t (own_t *sink_) :      active (0), +    eligible (0),      more (false),      sink (sink_),      terminating (false) @@ -41,20 +43,24 @@ zmq::dist_t::~dist_t ()  void zmq::dist_t::attach (writer_t *pipe_)  { -    //  If we are in the middle of sending a message, let's postpone plugging -    //  in the pipe. -    if (!terminating && more) { -        new_pipes.push_back (pipe_); -        return; -    } -      pipe_->set_event_sink (this); -    pipes.push_back (pipe_); -    pipes.swap (active, pipes.size () - 1); -    active++; +    //  If we are in the middle of sending a message, we'll add new pipe +    //  into the list of eligible pipes. Otherwise we add it to the list +    //  of active pipes. +    if (more) { +        pipes.push_back (pipe_); +        pipes.swap (eligible, pipes.size () - 1); +        eligible++; +    } +    else { +        pipes.push_back (pipe_); +        pipes.swap (active, pipes.size () - 1); +        active++; +        eligible++; +    } -    if (terminating) { +    if (unlikely (terminating)) {          sink->register_term_acks (1);          pipe_->terminate ();      } @@ -72,21 +78,30 @@ void zmq::dist_t::terminate ()  void zmq::dist_t::terminated (writer_t *pipe_)  { -    //  Remove the pipe from the list; adjust number of active pipes -    //  accordingly. +    //  Remove the pipe from the list; adjust number of active and/or +    //  eligible pipes accordingly.      if (pipes.index (pipe_) < active)          active--; +    if (pipes.index (pipe_) < eligible) +        eligible--;      pipes.erase (pipe_); -    if (terminating) +    if (unlikely (terminating))          sink->unregister_term_ack ();  }  void zmq::dist_t::activated (writer_t *pipe_)  { -    //  Move the pipe to the list of active pipes. -    pipes.swap (pipes.index (pipe_), active); -    active++; +    //  Move the pipe from passive to eligible state. +    pipes.swap (pipes.index (pipe_), eligible); +    eligible++; + +    //  If there's no message being sent at the moment, move it to +    //  the active state. +    if (!more) { +        pipes.swap (eligible - 1, active); +        active++; +    }  }  int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) @@ -97,9 +112,9 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)      //  Push the message to active pipes.      distribute (msg_, flags_); -    //  If mutlipart message is fully sent, activate new pipes. -    if (more && !msg_more) -        clear_new_pipes (); +    //  If multipart message is fully sent, activate all the eligible pipes. +    if (!msg_more) +        active = eligible;      more = msg_more; @@ -173,24 +188,13 @@ bool zmq::dist_t::has_out ()  bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)  {      if (!pipe_->write (msg_)) { +        pipes.swap (pipes.index (pipe_), active - 1);          active--; -        pipes.swap (pipes.index (pipe_), active); +        pipes.swap (active, eligible - 1); +        eligible--;          return false;      }      if (!(msg_->flags & ZMQ_MSG_MORE))          pipe_->flush ();      return true;  } - -void zmq::dist_t::clear_new_pipes () -{ -    for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); -          ++it) { -        (*it)->set_event_sink (this); -        pipes.push_back (*it); -        pipes.swap (active, pipes.size () - 1); -        active++; -    } -    new_pipes.clear (); -} - diff --git a/src/dist.hpp b/src/dist.hpp index ad9767a..45aaf90 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -56,24 +56,22 @@ namespace zmq          //  Put the message to all active pipes.          void distribute (zmq_msg_t *msg_, int flags_); -        //  Plug in all the delayed pipes. -        void clear_new_pipes (); -          //  List of outbound pipes.          typedef array_t <class writer_t> pipes_t;          pipes_t pipes; -        //  List of new pipes that were not yet inserted into 'pipes' list. -        //  These pipes are moves to 'pipes' list once the current multipart -        //  message is fully sent. This way we avoid sending incomplete messages -        //  to peers. -        typedef std::vector <class writer_t*> new_pipes_t; -        new_pipes_t new_pipes; -          //  Number of active pipes. All the active pipes are located at the -        //  beginning of the pipes array. +        //  beginning of the pipes array. These are the pipes the messages +        //  can be sent to at the moment.          pipes_t::size_type active; +        //  Number of pipes eligible for sending messages to. This includes all +        //  the active pipes plus all the pipes that we can in theory send +        //  messages to (the HWM is not yet reached), but sending a message +        //  to them would result in partial message being delivered, ie. message +        //  with initial parts missing. +        pipes_t::size_type eligible; +          //  True if last we are in the middle of a multipart message.          bool more; diff --git a/src/err.cpp b/src/err.cpp index 8761c22..d280487 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -68,119 +68,125 @@ const char *zmq::errno_to_string (int errno_)  const char *zmq::wsa_error()  { -    int errcode = WSAGetLastError (); +    int no = WSAGetLastError ();      //  TODO: This is not a generic way to handle this... -    if (errcode == WSAEWOULDBLOCK) +    if (no == WSAEWOULDBLOCK)          return NULL; +    return wsa_error_no (no); +} + +const char *zmq::wsa_error_no (int no_) +{      //  TODO:  It seems that list of Windows socket errors is longer than this.      //         Investigate whether there's a way to convert it into the string      //         automatically (wsaError->HRESULT->string?).      return -        (errcode == WSABASEERR) ? +        (no_ == WSABASEERR) ?              "No Error" :  -        (errcode == WSAEINTR) ? +        (no_ == WSAEINTR) ?              "Interrupted system call" :  -        (errcode == WSAEBADF) ? +        (no_ == WSAEBADF) ?              "Bad file number" :  -        (errcode == WSAEACCES) ? +        (no_ == WSAEACCES) ?              "Permission denied" :  -        (errcode == WSAEFAULT) ? +        (no_ == WSAEFAULT) ?              "Bad address" :  -        (errcode == WSAEINVAL) ? +        (no_ == WSAEINVAL) ?              "Invalid argument" :  -        (errcode == WSAEMFILE) ? +        (no_ == WSAEMFILE) ?              "Too many open files" :  -        (errcode == WSAEWOULDBLOCK) ? +        (no_ == WSAEWOULDBLOCK) ?              "Operation would block" :  -        (errcode == WSAEINPROGRESS) ? +        (no_ == WSAEINPROGRESS) ?              "Operation now in progress" :  -        (errcode == WSAEALREADY) ? +        (no_ == WSAEALREADY) ?              "Operation already in progress" :  -        (errcode == WSAENOTSOCK) ? +        (no_ == WSAENOTSOCK) ?              "Socket operation on non-socket" :  -        (errcode == WSAEDESTADDRREQ) ? +        (no_ == WSAEDESTADDRREQ) ?              "Destination address required" :  -        (errcode == WSAEMSGSIZE) ? +        (no_ == WSAEMSGSIZE) ?              "Message too long" :  -        (errcode == WSAEPROTOTYPE) ? +        (no_ == WSAEPROTOTYPE) ?              "Protocol wrong type for socket" :  -        (errcode == WSAENOPROTOOPT) ? +        (no_ == WSAENOPROTOOPT) ?              "Bad protocol option" :  -        (errcode == WSAEPROTONOSUPPORT) ? +        (no_ == WSAEPROTONOSUPPORT) ?              "Protocol not supported" :  -        (errcode == WSAESOCKTNOSUPPORT) ? +        (no_ == WSAESOCKTNOSUPPORT) ?              "Socket type not supported" :  -        (errcode == WSAEOPNOTSUPP) ? +        (no_ == WSAEOPNOTSUPP) ?              "Operation not supported on socket" :  -        (errcode == WSAEPFNOSUPPORT) ? +        (no_ == WSAEPFNOSUPPORT) ?              "Protocol family not supported" :  -        (errcode == WSAEAFNOSUPPORT) ? +        (no_ == WSAEAFNOSUPPORT) ?              "Address family not supported by protocol family" :  -        (errcode == WSAEADDRINUSE) ? +        (no_ == WSAEADDRINUSE) ?              "Address already in use" :  -        (errcode == WSAEADDRNOTAVAIL) ? +        (no_ == WSAEADDRNOTAVAIL) ?              "Can't assign requested address" :  -        (errcode == WSAENETDOWN) ? +        (no_ == WSAENETDOWN) ?              "Network is down" :  -        (errcode == WSAENETUNREACH) ? +        (no_ == WSAENETUNREACH) ?              "Network is unreachable" :  -        (errcode == WSAENETRESET) ? +        (no_ == WSAENETRESET) ?              "Net dropped connection or reset" :  -        (errcode == WSAECONNABORTED) ? +        (no_ == WSAECONNABORTED) ?              "Software caused connection abort" :  -        (errcode == WSAECONNRESET) ? +        (no_ == WSAECONNRESET) ?              "Connection reset by peer" :  -        (errcode == WSAENOBUFS) ? +        (no_ == WSAENOBUFS) ?              "No buffer space available" :  -        (errcode == WSAEISCONN) ? +        (no_ == WSAEISCONN) ?              "Socket is already connected" :  -        (errcode == WSAENOTCONN) ? +        (no_ == WSAENOTCONN) ?              "Socket is not connected" :  -        (errcode == WSAESHUTDOWN) ? +        (no_ == WSAESHUTDOWN) ?              "Can't send after socket shutdown" :  -        (errcode == WSAETOOMANYREFS) ? +        (no_ == WSAETOOMANYREFS) ?              "Too many references can't splice" :  -        (errcode == WSAETIMEDOUT) ? +        (no_ == WSAETIMEDOUT) ?              "Connection timed out" :  -        (errcode == WSAECONNREFUSED) ? +        (no_ == WSAECONNREFUSED) ?              "Connection refused" :  -        (errcode == WSAELOOP) ? +        (no_ == WSAELOOP) ?              "Too many levels of symbolic links" :  -        (errcode == WSAENAMETOOLONG) ? +        (no_ == WSAENAMETOOLONG) ?              "File name too long" :  -        (errcode == WSAEHOSTDOWN) ? +        (no_ == WSAEHOSTDOWN) ?              "Host is down" :  -        (errcode == WSAEHOSTUNREACH) ? +        (no_ == WSAEHOSTUNREACH) ?              "No Route to Host" :  -        (errcode == WSAENOTEMPTY) ? +        (no_ == WSAENOTEMPTY) ?              "Directory not empty" :  -        (errcode == WSAEPROCLIM) ? +        (no_ == WSAEPROCLIM) ?              "Too many processes" :  -        (errcode == WSAEUSERS) ? +        (no_ == WSAEUSERS) ?              "Too many users" :  -        (errcode == WSAEDQUOT) ? +        (no_ == WSAEDQUOT) ?              "Disc Quota Exceeded" :  -        (errcode == WSAESTALE) ? +        (no_ == WSAESTALE) ?              "Stale NFS file handle" :  -        (errcode == WSAEREMOTE) ? +        (no_ == WSAEREMOTE) ?              "Too many levels of remote in path" :  -        (errcode == WSASYSNOTREADY) ? +        (no_ == WSASYSNOTREADY) ?              "Network SubSystem is unavailable" :  -        (errcode == WSAVERNOTSUPPORTED) ? +        (no_ == WSAVERNOTSUPPORTED) ?              "WINSOCK DLL Version out of range" :  -        (errcode == WSANOTINITIALISED) ? +        (no_ == WSANOTINITIALISED) ?              "Successful WSASTARTUP not yet performed" :  -        (errcode == WSAHOST_NOT_FOUND) ? +        (no_ == WSAHOST_NOT_FOUND) ?              "Host not found" :  -        (errcode == WSATRY_AGAIN) ? +        (no_ == WSATRY_AGAIN) ?              "Non-Authoritative Host not found" :  -        (errcode == WSANO_RECOVERY) ? +        (no_ == WSANO_RECOVERY) ?              "Non-Recoverable errors: FORMERR REFUSED NOTIMP" :  -        (errcode == WSANO_DATA) ? +        (no_ == WSANO_DATA) ?              "Valid name no data record of requested" :          "error not defined";   } +  void zmq::win_error (char *buffer_, size_t buffer_size_)  {      DWORD errcode = GetLastError (); diff --git a/src/err.hpp b/src/err.hpp index 3ffd99d..b540a5d 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -46,6 +46,7 @@ namespace zmq  namespace zmq  {      const char *wsa_error (); +    const char *wsa_error_no (int no_);      void win_error (char *buffer_, size_t buffer_size_);      void wsa_error_to_errno ();   } @@ -63,6 +64,17 @@ namespace zmq          }\      } while (false) +//  Provides convenient way to assert on WSA-style errors on Windows. +#define wsa_assert_no(no) \ +    do {\ +        const char *errstr = zmq::wsa_error_no (no);\ +        if (errstr != NULL) {\ +            fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \ +                __FILE__, __LINE__);\ +            abort ();\ +        }\ +    } while (false) +  // Provides convenient way to check GetLastError-style errors on Windows.  #define win_assert(x) \      do {\ diff --git a/src/rep.cpp b/src/rep.cpp index 46c35cb..dc55ad0 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -78,14 +78,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)              int rc = xrep_t::xrecv (msg_, flags_);              if (rc != 0)                  return rc; -            zmq_assert (msg_->flags & ZMQ_MSG_MORE); -            //  Empty message part delimits the traceback stack. -            bottom = (zmq_msg_size (msg_) == 0); - -            //  Push it to the reply pipe. -            rc = xrep_t::xsend (msg_, flags_); -            zmq_assert (rc == 0); +            if ((msg_->flags & ZMQ_MSG_MORE)) { +                //  Empty message part delimits the traceback stack. +                bottom = (zmq_msg_size (msg_) == 0); + +                //  Push it to the reply pipe. +                rc = xrep_t::xsend (msg_, flags_); +                zmq_assert (rc == 0); +            } +            else { +                //  If the traceback stack is malformed, discard anything +                //  already sent to pipe (we're at end of invalid message). +                rc = xrep_t::rollback (); +                zmq_assert (rc == 0); +            }          }          request_begins = false; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4317bb0..24789b8 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -61,6 +61,11 @@  #include "xpub.hpp"  #include "xsub.hpp" +bool zmq::socket_base_t::check_tag () +{ +    return tag == 0xbaddecaf; +} +  zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,      uint32_t tid_)  { @@ -110,6 +115,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,  zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :      own_t (parent_, tid_), +    tag (0xbaddecaf),      ctx_terminated (false),      destroyed (false),      last_tsc (0), @@ -126,6 +132,9 @@ zmq::socket_base_t::~socket_base_t ()      sessions_sync.lock ();      zmq_assert (sessions.empty ());      sessions_sync.unlock (); + +    //  Mark the socket as dead. +    tag = 0xdeadbeef;  }  zmq::mailbox_t *zmq::socket_base_t::get_mailbox () diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 15ac83c..333cddd 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -50,6 +50,9 @@ namespace zmq      public: +        //  Returns false if object is not a socket. +        bool check_tag (); +          //  Create a socket of a specified type.          static socket_base_t *create (int type_, class ctx_t *parent_,              uint32_t tid_); @@ -136,6 +139,9 @@ namespace zmq      private: +        //  Used to check whether the object is a socket. +        uint32_t tag; +          //  If true, associated context was already terminated.          bool ctx_terminated; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 0c1581d..71b362b 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -118,11 +118,12 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()          //  Assert that the error was caused by the networking problems          //  rather than 0MQ bug. -        errno = err; -        errno_assert (errno == WSAECONNREFUSED || errno == WSAETIMEDOUT || -            errno == WSAECONNABORTED || errno == WSAEHOSTUNREACH); +        if (err == WSAECONNREFUSED || err == WSAETIMEDOUT || +              err == WSAECONNABORTED || err == WSAEHOSTUNREACH || +              err == WSAENETUNREACH || err == WSAENETDOWN) +            return retired_fd; -        return retired_fd; +        wsa_assert_no (err);      }      //  Return the newly connected socket. diff --git a/src/xrep.cpp b/src/xrep.cpp index 75dc30e..7317056 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -63,7 +63,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,          if (terminating) {              register_term_acks (1); -            outpipe_->terminate ();         +            outpipe_->terminate ();          }      } @@ -102,11 +102,13 @@ void zmq::xrep_t::terminated (reader_t *pipe_)      for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();            ++it) {          if (it->reader == pipe_) { +            if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) +                current_in--;              inpipes.erase (it); -            if (terminating) -                unregister_term_ack ();              if (current_in >= inpipes.size ())                  current_in = 0; +            if (terminating) +                unregister_term_ack ();              return;          }      } @@ -288,6 +290,16 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)      return -1;  } +int zmq::xrep_t::rollback (void) +{ +    if (current_out) { +        current_out->rollback (); +        current_out = NULL; +        more_out = false; +    } +    return 0; +} +  bool zmq::xrep_t::xhas_in ()  {      //  There are subsequent parts of the partly-read message available. diff --git a/src/xrep.hpp b/src/xrep.hpp index d7fbe9f..d1189f4 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -47,6 +47,7 @@ namespace zmq              const blob_t &peer_identity_);          int xsend (zmq_msg_t *msg_, int flags_);          int xrecv (zmq_msg_t *msg_, int flags_); +        int rollback ();          bool xhas_in ();          bool xhas_out (); diff --git a/src/zmq.cpp b/src/zmq.cpp index 61f942d..85f7d62 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -284,7 +284,7 @@ void *zmq_init (int io_threads_)  int zmq_term (void *ctx_)  { -    if (!ctx_) { +    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {          errno = EFAULT;          return -1;      } @@ -310,7 +310,7 @@ int zmq_term (void *ctx_)  void *zmq_socket (void *ctx_, int type_)  { -    if (!ctx_) { +    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {          errno = EFAULT;          return NULL;      } @@ -319,8 +319,8 @@ void *zmq_socket (void *ctx_, int type_)  int zmq_close (void *s_)  { -    if (!s_) { -        errno = EFAULT; +    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { +        errno = ENOTSOCK;          return -1;      }      ((zmq::socket_base_t*) s_)->close (); @@ -330,8 +330,8 @@ int zmq_close (void *s_)  int zmq_setsockopt (void *s_, int option_, const void *optval_,      size_t optvallen_)  { -    if (!s_) { -        errno = EFAULT; +    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { +        errno = ENOTSOCK;          return -1;      }      return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_, @@ -340,8 +340,8 @@ int zmq_setsockopt (void *s_, int option_, const void *optval_,  int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)  { -    if (!s_) { -        errno = EFAULT; +    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { +        errno = ENOTSOCK;          return -1;      }      return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_, @@ -350,8 +350,8 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)  int zmq_bind (void *s_, const char *addr_)  { -    if (!s_) { -        errno = EFAULT; +    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { +        errno = ENOTSOCK;          return -1;      }      return (((zmq::socket_base_t*) s_)->bind (addr_)); @@ -359,8 +359,8 @@ int zmq_bind (void *s_, const char *addr_)  int zmq_connect (void *s_, const char *addr_)  { -    if (!s_) { -        errno = EFAULT; +    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { +        errno = ENOTSOCK;          return -1;      }      return (((zmq::socket_base_t*) s_)->connect (addr_)); @@ -368,8 +368,8 @@ int zmq_connect (void *s_, const char *addr_)  int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)  { -    if (!s_) { -        errno = EFAULT; +    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { +        errno = ENOTSOCK;          return -1;      }      return (((zmq::socket_base_t*) s_)->send (msg_, flags_)); @@ -377,8 +377,8 @@ int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)  int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)  { -    if (!s_) { -        errno = EFAULT; +    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { +        errno = ENOTSOCK;          return -1;      }      return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); | 
