diff options
author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:57 +0100 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:57 +0100 |
commit | b593ea30833ad5dcacb9076c988aec31b0cf26ec (patch) | |
tree | 3d0c6f1dadfa2d947cf23f6109bb27b01ab202af /src | |
parent | cbaa7cfa93893876e4fd8794b6ea39f4d245b6b5 (diff) |
Imported Upstream version 2.1.7upstream/2.1.7
Diffstat (limited to 'src')
-rw-r--r-- | src/ctx.cpp | 9 | ||||
-rw-r--r-- | src/ctx.hpp | 6 | ||||
-rw-r--r-- | src/decoder.cpp | 4 | ||||
-rw-r--r-- | src/dist.cpp | 72 | ||||
-rw-r--r-- | src/dist.hpp | 20 | ||||
-rw-r--r-- | src/err.cpp | 112 | ||||
-rw-r--r-- | src/err.hpp | 12 | ||||
-rw-r--r-- | src/rep.cpp | 21 | ||||
-rw-r--r-- | src/socket_base.cpp | 9 | ||||
-rw-r--r-- | src/socket_base.hpp | 6 | ||||
-rw-r--r-- | src/tcp_connecter.cpp | 9 | ||||
-rw-r--r-- | src/xrep.cpp | 18 | ||||
-rw-r--r-- | src/xrep.hpp | 1 | ||||
-rw-r--r-- | src/zmq.cpp | 32 |
14 files changed, 201 insertions, 130 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp index 9cbb9de..2758729 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -36,6 +36,7 @@ #endif zmq::ctx_t::ctx_t (uint32_t io_threads_) : + tag (0xbadcafe0), terminating (false) { int rc; @@ -78,6 +79,11 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : zmq_assert (rc == 0); } +bool zmq::ctx_t::check_tag () +{ + return tag == 0xbadcafe0; +} + zmq::ctx_t::~ctx_t () { // Check that there are no remaining sockets. @@ -99,6 +105,9 @@ zmq::ctx_t::~ctx_t () // needed as mailboxes themselves were deallocated with their // corresponding io_thread/socket objects. free (slots); + + // Remove the tag, so that the object is considered dead. + tag = 0xdeadbeef; } int zmq::ctx_t::terminate () diff --git a/src/ctx.hpp b/src/ctx.hpp index c6ea4ce..33d5dad 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -60,6 +60,9 @@ namespace zmq // of I/O thread pool to create. ctx_t (uint32_t io_threads_); + // Returns false if object is not a context. + bool check_tag (); + // This function is called when user invokes zmq_term. If there are // no more sockets open it'll cause all the infrastructure to be shut // down. If there are open sockets still, the deallocation happens @@ -98,6 +101,9 @@ namespace zmq ~ctx_t (); + // Used to check whether the object is a context. + uint32_t tag; + // Sockets belonging to this context. We need the list so that // we can notify the sockets when zmq_term() is called. The sockets // will return ETERM then. diff --git a/src/decoder.cpp b/src/decoder.cpp index c9a7dc9..84ecd92 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -109,11 +109,11 @@ bool zmq::decoder_t::eight_byte_size_ready () bool zmq::decoder_t::flags_ready () { // Store the flags from the wire into the message structure. - in_progress.flags = tmpbuf [0]; + in_progress.flags = tmpbuf [0] | ~ZMQ_MSG_MASK; next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), &decoder_t::message_ready); - + return true; } diff --git a/src/dist.cpp b/src/dist.cpp index e447bc1..d74e69f 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -25,9 +25,11 @@ #include "err.hpp" #include "own.hpp" #include "msg_content.hpp" +#include "likely.hpp" zmq::dist_t::dist_t (own_t *sink_) : active (0), + eligible (0), more (false), sink (sink_), terminating (false) @@ -41,20 +43,24 @@ zmq::dist_t::~dist_t () void zmq::dist_t::attach (writer_t *pipe_) { - // If we are in the middle of sending a message, let's postpone plugging - // in the pipe. - if (!terminating && more) { - new_pipes.push_back (pipe_); - return; - } - pipe_->set_event_sink (this); - pipes.push_back (pipe_); - pipes.swap (active, pipes.size () - 1); - active++; + // If we are in the middle of sending a message, we'll add new pipe + // into the list of eligible pipes. Otherwise we add it to the list + // of active pipes. + if (more) { + pipes.push_back (pipe_); + pipes.swap (eligible, pipes.size () - 1); + eligible++; + } + else { + pipes.push_back (pipe_); + pipes.swap (active, pipes.size () - 1); + active++; + eligible++; + } - if (terminating) { + if (unlikely (terminating)) { sink->register_term_acks (1); pipe_->terminate (); } @@ -72,21 +78,30 @@ void zmq::dist_t::terminate () void zmq::dist_t::terminated (writer_t *pipe_) { - // Remove the pipe from the list; adjust number of active pipes - // accordingly. + // Remove the pipe from the list; adjust number of active and/or + // eligible pipes accordingly. if (pipes.index (pipe_) < active) active--; + if (pipes.index (pipe_) < eligible) + eligible--; pipes.erase (pipe_); - if (terminating) + if (unlikely (terminating)) sink->unregister_term_ack (); } void zmq::dist_t::activated (writer_t *pipe_) { - // Move the pipe to the list of active pipes. - pipes.swap (pipes.index (pipe_), active); - active++; + // Move the pipe from passive to eligible state. + pipes.swap (pipes.index (pipe_), eligible); + eligible++; + + // If there's no message being sent at the moment, move it to + // the active state. + if (!more) { + pipes.swap (eligible - 1, active); + active++; + } } int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) @@ -97,9 +112,9 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) // Push the message to active pipes. distribute (msg_, flags_); - // If mutlipart message is fully sent, activate new pipes. - if (more && !msg_more) - clear_new_pipes (); + // If multipart message is fully sent, activate all the eligible pipes. + if (!msg_more) + active = eligible; more = msg_more; @@ -173,24 +188,13 @@ bool zmq::dist_t::has_out () bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) { if (!pipe_->write (msg_)) { + pipes.swap (pipes.index (pipe_), active - 1); active--; - pipes.swap (pipes.index (pipe_), active); + pipes.swap (active, eligible - 1); + eligible--; return false; } if (!(msg_->flags & ZMQ_MSG_MORE)) pipe_->flush (); return true; } - -void zmq::dist_t::clear_new_pipes () -{ - for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); - ++it) { - (*it)->set_event_sink (this); - pipes.push_back (*it); - pipes.swap (active, pipes.size () - 1); - active++; - } - new_pipes.clear (); -} - diff --git a/src/dist.hpp b/src/dist.hpp index ad9767a..45aaf90 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -56,24 +56,22 @@ namespace zmq // Put the message to all active pipes. void distribute (zmq_msg_t *msg_, int flags_); - // Plug in all the delayed pipes. - void clear_new_pipes (); - // List of outbound pipes. typedef array_t <class writer_t> pipes_t; pipes_t pipes; - // List of new pipes that were not yet inserted into 'pipes' list. - // These pipes are moves to 'pipes' list once the current multipart - // message is fully sent. This way we avoid sending incomplete messages - // to peers. - typedef std::vector <class writer_t*> new_pipes_t; - new_pipes_t new_pipes; - // Number of active pipes. All the active pipes are located at the - // beginning of the pipes array. + // beginning of the pipes array. These are the pipes the messages + // can be sent to at the moment. pipes_t::size_type active; + // Number of pipes eligible for sending messages to. This includes all + // the active pipes plus all the pipes that we can in theory send + // messages to (the HWM is not yet reached), but sending a message + // to them would result in partial message being delivered, ie. message + // with initial parts missing. + pipes_t::size_type eligible; + // True if last we are in the middle of a multipart message. bool more; diff --git a/src/err.cpp b/src/err.cpp index 8761c22..d280487 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -68,119 +68,125 @@ const char *zmq::errno_to_string (int errno_) const char *zmq::wsa_error() { - int errcode = WSAGetLastError (); + int no = WSAGetLastError (); // TODO: This is not a generic way to handle this... - if (errcode == WSAEWOULDBLOCK) + if (no == WSAEWOULDBLOCK) return NULL; + return wsa_error_no (no); +} + +const char *zmq::wsa_error_no (int no_) +{ // TODO: It seems that list of Windows socket errors is longer than this. // Investigate whether there's a way to convert it into the string // automatically (wsaError->HRESULT->string?). return - (errcode == WSABASEERR) ? + (no_ == WSABASEERR) ? "No Error" : - (errcode == WSAEINTR) ? + (no_ == WSAEINTR) ? "Interrupted system call" : - (errcode == WSAEBADF) ? + (no_ == WSAEBADF) ? "Bad file number" : - (errcode == WSAEACCES) ? + (no_ == WSAEACCES) ? "Permission denied" : - (errcode == WSAEFAULT) ? + (no_ == WSAEFAULT) ? "Bad address" : - (errcode == WSAEINVAL) ? + (no_ == WSAEINVAL) ? "Invalid argument" : - (errcode == WSAEMFILE) ? + (no_ == WSAEMFILE) ? "Too many open files" : - (errcode == WSAEWOULDBLOCK) ? + (no_ == WSAEWOULDBLOCK) ? "Operation would block" : - (errcode == WSAEINPROGRESS) ? + (no_ == WSAEINPROGRESS) ? "Operation now in progress" : - (errcode == WSAEALREADY) ? + (no_ == WSAEALREADY) ? "Operation already in progress" : - (errcode == WSAENOTSOCK) ? + (no_ == WSAENOTSOCK) ? "Socket operation on non-socket" : - (errcode == WSAEDESTADDRREQ) ? + (no_ == WSAEDESTADDRREQ) ? "Destination address required" : - (errcode == WSAEMSGSIZE) ? + (no_ == WSAEMSGSIZE) ? "Message too long" : - (errcode == WSAEPROTOTYPE) ? + (no_ == WSAEPROTOTYPE) ? "Protocol wrong type for socket" : - (errcode == WSAENOPROTOOPT) ? + (no_ == WSAENOPROTOOPT) ? "Bad protocol option" : - (errcode == WSAEPROTONOSUPPORT) ? + (no_ == WSAEPROTONOSUPPORT) ? "Protocol not supported" : - (errcode == WSAESOCKTNOSUPPORT) ? + (no_ == WSAESOCKTNOSUPPORT) ? "Socket type not supported" : - (errcode == WSAEOPNOTSUPP) ? + (no_ == WSAEOPNOTSUPP) ? "Operation not supported on socket" : - (errcode == WSAEPFNOSUPPORT) ? + (no_ == WSAEPFNOSUPPORT) ? "Protocol family not supported" : - (errcode == WSAEAFNOSUPPORT) ? + (no_ == WSAEAFNOSUPPORT) ? "Address family not supported by protocol family" : - (errcode == WSAEADDRINUSE) ? + (no_ == WSAEADDRINUSE) ? "Address already in use" : - (errcode == WSAEADDRNOTAVAIL) ? + (no_ == WSAEADDRNOTAVAIL) ? "Can't assign requested address" : - (errcode == WSAENETDOWN) ? + (no_ == WSAENETDOWN) ? "Network is down" : - (errcode == WSAENETUNREACH) ? + (no_ == WSAENETUNREACH) ? "Network is unreachable" : - (errcode == WSAENETRESET) ? + (no_ == WSAENETRESET) ? "Net dropped connection or reset" : - (errcode == WSAECONNABORTED) ? + (no_ == WSAECONNABORTED) ? "Software caused connection abort" : - (errcode == WSAECONNRESET) ? + (no_ == WSAECONNRESET) ? "Connection reset by peer" : - (errcode == WSAENOBUFS) ? + (no_ == WSAENOBUFS) ? "No buffer space available" : - (errcode == WSAEISCONN) ? + (no_ == WSAEISCONN) ? "Socket is already connected" : - (errcode == WSAENOTCONN) ? + (no_ == WSAENOTCONN) ? "Socket is not connected" : - (errcode == WSAESHUTDOWN) ? + (no_ == WSAESHUTDOWN) ? "Can't send after socket shutdown" : - (errcode == WSAETOOMANYREFS) ? + (no_ == WSAETOOMANYREFS) ? "Too many references can't splice" : - (errcode == WSAETIMEDOUT) ? + (no_ == WSAETIMEDOUT) ? "Connection timed out" : - (errcode == WSAECONNREFUSED) ? + (no_ == WSAECONNREFUSED) ? "Connection refused" : - (errcode == WSAELOOP) ? + (no_ == WSAELOOP) ? "Too many levels of symbolic links" : - (errcode == WSAENAMETOOLONG) ? + (no_ == WSAENAMETOOLONG) ? "File name too long" : - (errcode == WSAEHOSTDOWN) ? + (no_ == WSAEHOSTDOWN) ? "Host is down" : - (errcode == WSAEHOSTUNREACH) ? + (no_ == WSAEHOSTUNREACH) ? "No Route to Host" : - (errcode == WSAENOTEMPTY) ? + (no_ == WSAENOTEMPTY) ? "Directory not empty" : - (errcode == WSAEPROCLIM) ? + (no_ == WSAEPROCLIM) ? "Too many processes" : - (errcode == WSAEUSERS) ? + (no_ == WSAEUSERS) ? "Too many users" : - (errcode == WSAEDQUOT) ? + (no_ == WSAEDQUOT) ? "Disc Quota Exceeded" : - (errcode == WSAESTALE) ? + (no_ == WSAESTALE) ? "Stale NFS file handle" : - (errcode == WSAEREMOTE) ? + (no_ == WSAEREMOTE) ? "Too many levels of remote in path" : - (errcode == WSASYSNOTREADY) ? + (no_ == WSASYSNOTREADY) ? "Network SubSystem is unavailable" : - (errcode == WSAVERNOTSUPPORTED) ? + (no_ == WSAVERNOTSUPPORTED) ? "WINSOCK DLL Version out of range" : - (errcode == WSANOTINITIALISED) ? + (no_ == WSANOTINITIALISED) ? "Successful WSASTARTUP not yet performed" : - (errcode == WSAHOST_NOT_FOUND) ? + (no_ == WSAHOST_NOT_FOUND) ? "Host not found" : - (errcode == WSATRY_AGAIN) ? + (no_ == WSATRY_AGAIN) ? "Non-Authoritative Host not found" : - (errcode == WSANO_RECOVERY) ? + (no_ == WSANO_RECOVERY) ? "Non-Recoverable errors: FORMERR REFUSED NOTIMP" : - (errcode == WSANO_DATA) ? + (no_ == WSANO_DATA) ? "Valid name no data record of requested" : "error not defined"; } + void zmq::win_error (char *buffer_, size_t buffer_size_) { DWORD errcode = GetLastError (); diff --git a/src/err.hpp b/src/err.hpp index 3ffd99d..b540a5d 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -46,6 +46,7 @@ namespace zmq namespace zmq { const char *wsa_error (); + const char *wsa_error_no (int no_); void win_error (char *buffer_, size_t buffer_size_); void wsa_error_to_errno (); } @@ -63,6 +64,17 @@ namespace zmq }\ } while (false) +// Provides convenient way to assert on WSA-style errors on Windows. +#define wsa_assert_no(no) \ + do {\ + const char *errstr = zmq::wsa_error_no (no);\ + if (errstr != NULL) {\ + fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \ + __FILE__, __LINE__);\ + abort ();\ + }\ + } while (false) + // Provides convenient way to check GetLastError-style errors on Windows. #define win_assert(x) \ do {\ diff --git a/src/rep.cpp b/src/rep.cpp index 46c35cb..dc55ad0 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -78,14 +78,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) int rc = xrep_t::xrecv (msg_, flags_); if (rc != 0) return rc; - zmq_assert (msg_->flags & ZMQ_MSG_MORE); - // Empty message part delimits the traceback stack. - bottom = (zmq_msg_size (msg_) == 0); - - // Push it to the reply pipe. - rc = xrep_t::xsend (msg_, flags_); - zmq_assert (rc == 0); + if ((msg_->flags & ZMQ_MSG_MORE)) { + // Empty message part delimits the traceback stack. + bottom = (zmq_msg_size (msg_) == 0); + + // Push it to the reply pipe. + rc = xrep_t::xsend (msg_, flags_); + zmq_assert (rc == 0); + } + else { + // If the traceback stack is malformed, discard anything + // already sent to pipe (we're at end of invalid message). + rc = xrep_t::rollback (); + zmq_assert (rc == 0); + } } request_begins = false; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4317bb0..24789b8 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -61,6 +61,11 @@ #include "xpub.hpp" #include "xsub.hpp" +bool zmq::socket_base_t::check_tag () +{ + return tag == 0xbaddecaf; +} + zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, uint32_t tid_) { @@ -110,6 +115,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : own_t (parent_, tid_), + tag (0xbaddecaf), ctx_terminated (false), destroyed (false), last_tsc (0), @@ -126,6 +132,9 @@ zmq::socket_base_t::~socket_base_t () sessions_sync.lock (); zmq_assert (sessions.empty ()); sessions_sync.unlock (); + + // Mark the socket as dead. + tag = 0xdeadbeef; } zmq::mailbox_t *zmq::socket_base_t::get_mailbox () diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 15ac83c..333cddd 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -50,6 +50,9 @@ namespace zmq public: + // Returns false if object is not a socket. + bool check_tag (); + // Create a socket of a specified type. static socket_base_t *create (int type_, class ctx_t *parent_, uint32_t tid_); @@ -136,6 +139,9 @@ namespace zmq private: + // Used to check whether the object is a socket. + uint32_t tag; + // If true, associated context was already terminated. bool ctx_terminated; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 0c1581d..71b362b 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -118,11 +118,12 @@ zmq::fd_t zmq::tcp_connecter_t::connect () // Assert that the error was caused by the networking problems // rather than 0MQ bug. - errno = err; - errno_assert (errno == WSAECONNREFUSED || errno == WSAETIMEDOUT || - errno == WSAECONNABORTED || errno == WSAEHOSTUNREACH); + if (err == WSAECONNREFUSED || err == WSAETIMEDOUT || + err == WSAECONNABORTED || err == WSAEHOSTUNREACH || + err == WSAENETUNREACH || err == WSAENETDOWN) + return retired_fd; - return retired_fd; + wsa_assert_no (err); } // Return the newly connected socket. diff --git a/src/xrep.cpp b/src/xrep.cpp index 75dc30e..7317056 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -63,7 +63,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, if (terminating) { register_term_acks (1); - outpipe_->terminate (); + outpipe_->terminate (); } } @@ -102,11 +102,13 @@ void zmq::xrep_t::terminated (reader_t *pipe_) for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { if (it->reader == pipe_) { + if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) + current_in--; inpipes.erase (it); - if (terminating) - unregister_term_ack (); if (current_in >= inpipes.size ()) current_in = 0; + if (terminating) + unregister_term_ack (); return; } } @@ -288,6 +290,16 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) return -1; } +int zmq::xrep_t::rollback (void) +{ + if (current_out) { + current_out->rollback (); + current_out = NULL; + more_out = false; + } + return 0; +} + bool zmq::xrep_t::xhas_in () { // There are subsequent parts of the partly-read message available. diff --git a/src/xrep.hpp b/src/xrep.hpp index d7fbe9f..d1189f4 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -47,6 +47,7 @@ namespace zmq const blob_t &peer_identity_); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); + int rollback (); bool xhas_in (); bool xhas_out (); diff --git a/src/zmq.cpp b/src/zmq.cpp index 61f942d..85f7d62 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -284,7 +284,7 @@ void *zmq_init (int io_threads_) int zmq_term (void *ctx_) { - if (!ctx_) { + if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { errno = EFAULT; return -1; } @@ -310,7 +310,7 @@ int zmq_term (void *ctx_) void *zmq_socket (void *ctx_, int type_) { - if (!ctx_) { + if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { errno = EFAULT; return NULL; } @@ -319,8 +319,8 @@ void *zmq_socket (void *ctx_, int type_) int zmq_close (void *s_) { - if (!s_) { - errno = EFAULT; + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; return -1; } ((zmq::socket_base_t*) s_)->close (); @@ -330,8 +330,8 @@ int zmq_close (void *s_) int zmq_setsockopt (void *s_, int option_, const void *optval_, size_t optvallen_) { - if (!s_) { - errno = EFAULT; + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; return -1; } return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_, @@ -340,8 +340,8 @@ int zmq_setsockopt (void *s_, int option_, const void *optval_, int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) { - if (!s_) { - errno = EFAULT; + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; return -1; } return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_, @@ -350,8 +350,8 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) int zmq_bind (void *s_, const char *addr_) { - if (!s_) { - errno = EFAULT; + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; return -1; } return (((zmq::socket_base_t*) s_)->bind (addr_)); @@ -359,8 +359,8 @@ int zmq_bind (void *s_, const char *addr_) int zmq_connect (void *s_, const char *addr_) { - if (!s_) { - errno = EFAULT; + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; return -1; } return (((zmq::socket_base_t*) s_)->connect (addr_)); @@ -368,8 +368,8 @@ int zmq_connect (void *s_, const char *addr_) int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) { - if (!s_) { - errno = EFAULT; + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; return -1; } return (((zmq::socket_base_t*) s_)->send (msg_, flags_)); @@ -377,8 +377,8 @@ int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) { - if (!s_) { - errno = EFAULT; + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; return -1; } return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); |