summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Lucina <martin@lucina.net>2012-01-23 08:53:57 +0100
committerMartin Lucina <martin@lucina.net>2012-01-23 08:53:57 +0100
commitb593ea30833ad5dcacb9076c988aec31b0cf26ec (patch)
tree3d0c6f1dadfa2d947cf23f6109bb27b01ab202af /src
parentcbaa7cfa93893876e4fd8794b6ea39f4d245b6b5 (diff)
Imported Upstream version 2.1.7upstream/2.1.7
Diffstat (limited to 'src')
-rw-r--r--src/ctx.cpp9
-rw-r--r--src/ctx.hpp6
-rw-r--r--src/decoder.cpp4
-rw-r--r--src/dist.cpp72
-rw-r--r--src/dist.hpp20
-rw-r--r--src/err.cpp112
-rw-r--r--src/err.hpp12
-rw-r--r--src/rep.cpp21
-rw-r--r--src/socket_base.cpp9
-rw-r--r--src/socket_base.hpp6
-rw-r--r--src/tcp_connecter.cpp9
-rw-r--r--src/xrep.cpp18
-rw-r--r--src/xrep.hpp1
-rw-r--r--src/zmq.cpp32
14 files changed, 201 insertions, 130 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 9cbb9de..2758729 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -36,6 +36,7 @@
#endif
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
+ tag (0xbadcafe0),
terminating (false)
{
int rc;
@@ -78,6 +79,11 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
zmq_assert (rc == 0);
}
+bool zmq::ctx_t::check_tag ()
+{
+ return tag == 0xbadcafe0;
+}
+
zmq::ctx_t::~ctx_t ()
{
// Check that there are no remaining sockets.
@@ -99,6 +105,9 @@ zmq::ctx_t::~ctx_t ()
// needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects.
free (slots);
+
+ // Remove the tag, so that the object is considered dead.
+ tag = 0xdeadbeef;
}
int zmq::ctx_t::terminate ()
diff --git a/src/ctx.hpp b/src/ctx.hpp
index c6ea4ce..33d5dad 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -60,6 +60,9 @@ namespace zmq
// of I/O thread pool to create.
ctx_t (uint32_t io_threads_);
+ // Returns false if object is not a context.
+ bool check_tag ();
+
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
// down. If there are open sockets still, the deallocation happens
@@ -98,6 +101,9 @@ namespace zmq
~ctx_t ();
+ // Used to check whether the object is a context.
+ uint32_t tag;
+
// Sockets belonging to this context. We need the list so that
// we can notify the sockets when zmq_term() is called. The sockets
// will return ETERM then.
diff --git a/src/decoder.cpp b/src/decoder.cpp
index c9a7dc9..84ecd92 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -109,11 +109,11 @@ bool zmq::decoder_t::eight_byte_size_ready ()
bool zmq::decoder_t::flags_ready ()
{
// Store the flags from the wire into the message structure.
- in_progress.flags = tmpbuf [0];
+ in_progress.flags = tmpbuf [0] | ~ZMQ_MSG_MASK;
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&decoder_t::message_ready);
-
+
return true;
}
diff --git a/src/dist.cpp b/src/dist.cpp
index e447bc1..d74e69f 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -25,9 +25,11 @@
#include "err.hpp"
#include "own.hpp"
#include "msg_content.hpp"
+#include "likely.hpp"
zmq::dist_t::dist_t (own_t *sink_) :
active (0),
+ eligible (0),
more (false),
sink (sink_),
terminating (false)
@@ -41,20 +43,24 @@ zmq::dist_t::~dist_t ()
void zmq::dist_t::attach (writer_t *pipe_)
{
- // If we are in the middle of sending a message, let's postpone plugging
- // in the pipe.
- if (!terminating && more) {
- new_pipes.push_back (pipe_);
- return;
- }
-
pipe_->set_event_sink (this);
- pipes.push_back (pipe_);
- pipes.swap (active, pipes.size () - 1);
- active++;
+ // If we are in the middle of sending a message, we'll add new pipe
+ // into the list of eligible pipes. Otherwise we add it to the list
+ // of active pipes.
+ if (more) {
+ pipes.push_back (pipe_);
+ pipes.swap (eligible, pipes.size () - 1);
+ eligible++;
+ }
+ else {
+ pipes.push_back (pipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+ eligible++;
+ }
- if (terminating) {
+ if (unlikely (terminating)) {
sink->register_term_acks (1);
pipe_->terminate ();
}
@@ -72,21 +78,30 @@ void zmq::dist_t::terminate ()
void zmq::dist_t::terminated (writer_t *pipe_)
{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
+ // Remove the pipe from the list; adjust number of active and/or
+ // eligible pipes accordingly.
if (pipes.index (pipe_) < active)
active--;
+ if (pipes.index (pipe_) < eligible)
+ eligible--;
pipes.erase (pipe_);
- if (terminating)
+ if (unlikely (terminating))
sink->unregister_term_ack ();
}
void zmq::dist_t::activated (writer_t *pipe_)
{
- // Move the pipe to the list of active pipes.
- pipes.swap (pipes.index (pipe_), active);
- active++;
+ // Move the pipe from passive to eligible state.
+ pipes.swap (pipes.index (pipe_), eligible);
+ eligible++;
+
+ // If there's no message being sent at the moment, move it to
+ // the active state.
+ if (!more) {
+ pipes.swap (eligible - 1, active);
+ active++;
+ }
}
int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
@@ -97,9 +112,9 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
// Push the message to active pipes.
distribute (msg_, flags_);
- // If mutlipart message is fully sent, activate new pipes.
- if (more && !msg_more)
- clear_new_pipes ();
+ // If multipart message is fully sent, activate all the eligible pipes.
+ if (!msg_more)
+ active = eligible;
more = msg_more;
@@ -173,24 +188,13 @@ bool zmq::dist_t::has_out ()
bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
{
if (!pipe_->write (msg_)) {
+ pipes.swap (pipes.index (pipe_), active - 1);
active--;
- pipes.swap (pipes.index (pipe_), active);
+ pipes.swap (active, eligible - 1);
+ eligible--;
return false;
}
if (!(msg_->flags & ZMQ_MSG_MORE))
pipe_->flush ();
return true;
}
-
-void zmq::dist_t::clear_new_pipes ()
-{
- for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end ();
- ++it) {
- (*it)->set_event_sink (this);
- pipes.push_back (*it);
- pipes.swap (active, pipes.size () - 1);
- active++;
- }
- new_pipes.clear ();
-}
-
diff --git a/src/dist.hpp b/src/dist.hpp
index ad9767a..45aaf90 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -56,24 +56,22 @@ namespace zmq
// Put the message to all active pipes.
void distribute (zmq_msg_t *msg_, int flags_);
- // Plug in all the delayed pipes.
- void clear_new_pipes ();
-
// List of outbound pipes.
typedef array_t <class writer_t> pipes_t;
pipes_t pipes;
- // List of new pipes that were not yet inserted into 'pipes' list.
- // These pipes are moves to 'pipes' list once the current multipart
- // message is fully sent. This way we avoid sending incomplete messages
- // to peers.
- typedef std::vector <class writer_t*> new_pipes_t;
- new_pipes_t new_pipes;
-
// Number of active pipes. All the active pipes are located at the
- // beginning of the pipes array.
+ // beginning of the pipes array. These are the pipes the messages
+ // can be sent to at the moment.
pipes_t::size_type active;
+ // Number of pipes eligible for sending messages to. This includes all
+ // the active pipes plus all the pipes that we can in theory send
+ // messages to (the HWM is not yet reached), but sending a message
+ // to them would result in partial message being delivered, ie. message
+ // with initial parts missing.
+ pipes_t::size_type eligible;
+
// True if last we are in the middle of a multipart message.
bool more;
diff --git a/src/err.cpp b/src/err.cpp
index 8761c22..d280487 100644
--- a/src/err.cpp
+++ b/src/err.cpp
@@ -68,119 +68,125 @@ const char *zmq::errno_to_string (int errno_)
const char *zmq::wsa_error()
{
- int errcode = WSAGetLastError ();
+ int no = WSAGetLastError ();
// TODO: This is not a generic way to handle this...
- if (errcode == WSAEWOULDBLOCK)
+ if (no == WSAEWOULDBLOCK)
return NULL;
+ return wsa_error_no (no);
+}
+
+const char *zmq::wsa_error_no (int no_)
+{
// TODO: It seems that list of Windows socket errors is longer than this.
// Investigate whether there's a way to convert it into the string
// automatically (wsaError->HRESULT->string?).
return
- (errcode == WSABASEERR) ?
+ (no_ == WSABASEERR) ?
"No Error" :
- (errcode == WSAEINTR) ?
+ (no_ == WSAEINTR) ?
"Interrupted system call" :
- (errcode == WSAEBADF) ?
+ (no_ == WSAEBADF) ?
"Bad file number" :
- (errcode == WSAEACCES) ?
+ (no_ == WSAEACCES) ?
"Permission denied" :
- (errcode == WSAEFAULT) ?
+ (no_ == WSAEFAULT) ?
"Bad address" :
- (errcode == WSAEINVAL) ?
+ (no_ == WSAEINVAL) ?
"Invalid argument" :
- (errcode == WSAEMFILE) ?
+ (no_ == WSAEMFILE) ?
"Too many open files" :
- (errcode == WSAEWOULDBLOCK) ?
+ (no_ == WSAEWOULDBLOCK) ?
"Operation would block" :
- (errcode == WSAEINPROGRESS) ?
+ (no_ == WSAEINPROGRESS) ?
"Operation now in progress" :
- (errcode == WSAEALREADY) ?
+ (no_ == WSAEALREADY) ?
"Operation already in progress" :
- (errcode == WSAENOTSOCK) ?
+ (no_ == WSAENOTSOCK) ?
"Socket operation on non-socket" :
- (errcode == WSAEDESTADDRREQ) ?
+ (no_ == WSAEDESTADDRREQ) ?
"Destination address required" :
- (errcode == WSAEMSGSIZE) ?
+ (no_ == WSAEMSGSIZE) ?
"Message too long" :
- (errcode == WSAEPROTOTYPE) ?
+ (no_ == WSAEPROTOTYPE) ?
"Protocol wrong type for socket" :
- (errcode == WSAENOPROTOOPT) ?
+ (no_ == WSAENOPROTOOPT) ?
"Bad protocol option" :
- (errcode == WSAEPROTONOSUPPORT) ?
+ (no_ == WSAEPROTONOSUPPORT) ?
"Protocol not supported" :
- (errcode == WSAESOCKTNOSUPPORT) ?
+ (no_ == WSAESOCKTNOSUPPORT) ?
"Socket type not supported" :
- (errcode == WSAEOPNOTSUPP) ?
+ (no_ == WSAEOPNOTSUPP) ?
"Operation not supported on socket" :
- (errcode == WSAEPFNOSUPPORT) ?
+ (no_ == WSAEPFNOSUPPORT) ?
"Protocol family not supported" :
- (errcode == WSAEAFNOSUPPORT) ?
+ (no_ == WSAEAFNOSUPPORT) ?
"Address family not supported by protocol family" :
- (errcode == WSAEADDRINUSE) ?
+ (no_ == WSAEADDRINUSE) ?
"Address already in use" :
- (errcode == WSAEADDRNOTAVAIL) ?
+ (no_ == WSAEADDRNOTAVAIL) ?
"Can't assign requested address" :
- (errcode == WSAENETDOWN) ?
+ (no_ == WSAENETDOWN) ?
"Network is down" :
- (errcode == WSAENETUNREACH) ?
+ (no_ == WSAENETUNREACH) ?
"Network is unreachable" :
- (errcode == WSAENETRESET) ?
+ (no_ == WSAENETRESET) ?
"Net dropped connection or reset" :
- (errcode == WSAECONNABORTED) ?
+ (no_ == WSAECONNABORTED) ?
"Software caused connection abort" :
- (errcode == WSAECONNRESET) ?
+ (no_ == WSAECONNRESET) ?
"Connection reset by peer" :
- (errcode == WSAENOBUFS) ?
+ (no_ == WSAENOBUFS) ?
"No buffer space available" :
- (errcode == WSAEISCONN) ?
+ (no_ == WSAEISCONN) ?
"Socket is already connected" :
- (errcode == WSAENOTCONN) ?
+ (no_ == WSAENOTCONN) ?
"Socket is not connected" :
- (errcode == WSAESHUTDOWN) ?
+ (no_ == WSAESHUTDOWN) ?
"Can't send after socket shutdown" :
- (errcode == WSAETOOMANYREFS) ?
+ (no_ == WSAETOOMANYREFS) ?
"Too many references can't splice" :
- (errcode == WSAETIMEDOUT) ?
+ (no_ == WSAETIMEDOUT) ?
"Connection timed out" :
- (errcode == WSAECONNREFUSED) ?
+ (no_ == WSAECONNREFUSED) ?
"Connection refused" :
- (errcode == WSAELOOP) ?
+ (no_ == WSAELOOP) ?
"Too many levels of symbolic links" :
- (errcode == WSAENAMETOOLONG) ?
+ (no_ == WSAENAMETOOLONG) ?
"File name too long" :
- (errcode == WSAEHOSTDOWN) ?
+ (no_ == WSAEHOSTDOWN) ?
"Host is down" :
- (errcode == WSAEHOSTUNREACH) ?
+ (no_ == WSAEHOSTUNREACH) ?
"No Route to Host" :
- (errcode == WSAENOTEMPTY) ?
+ (no_ == WSAENOTEMPTY) ?
"Directory not empty" :
- (errcode == WSAEPROCLIM) ?
+ (no_ == WSAEPROCLIM) ?
"Too many processes" :
- (errcode == WSAEUSERS) ?
+ (no_ == WSAEUSERS) ?
"Too many users" :
- (errcode == WSAEDQUOT) ?
+ (no_ == WSAEDQUOT) ?
"Disc Quota Exceeded" :
- (errcode == WSAESTALE) ?
+ (no_ == WSAESTALE) ?
"Stale NFS file handle" :
- (errcode == WSAEREMOTE) ?
+ (no_ == WSAEREMOTE) ?
"Too many levels of remote in path" :
- (errcode == WSASYSNOTREADY) ?
+ (no_ == WSASYSNOTREADY) ?
"Network SubSystem is unavailable" :
- (errcode == WSAVERNOTSUPPORTED) ?
+ (no_ == WSAVERNOTSUPPORTED) ?
"WINSOCK DLL Version out of range" :
- (errcode == WSANOTINITIALISED) ?
+ (no_ == WSANOTINITIALISED) ?
"Successful WSASTARTUP not yet performed" :
- (errcode == WSAHOST_NOT_FOUND) ?
+ (no_ == WSAHOST_NOT_FOUND) ?
"Host not found" :
- (errcode == WSATRY_AGAIN) ?
+ (no_ == WSATRY_AGAIN) ?
"Non-Authoritative Host not found" :
- (errcode == WSANO_RECOVERY) ?
+ (no_ == WSANO_RECOVERY) ?
"Non-Recoverable errors: FORMERR REFUSED NOTIMP" :
- (errcode == WSANO_DATA) ?
+ (no_ == WSANO_DATA) ?
"Valid name no data record of requested" :
"error not defined";
}
+
void zmq::win_error (char *buffer_, size_t buffer_size_)
{
DWORD errcode = GetLastError ();
diff --git a/src/err.hpp b/src/err.hpp
index 3ffd99d..b540a5d 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -46,6 +46,7 @@ namespace zmq
namespace zmq
{
const char *wsa_error ();
+ const char *wsa_error_no (int no_);
void win_error (char *buffer_, size_t buffer_size_);
void wsa_error_to_errno ();
}
@@ -63,6 +64,17 @@ namespace zmq
}\
} while (false)
+// Provides convenient way to assert on WSA-style errors on Windows.
+#define wsa_assert_no(no) \
+ do {\
+ const char *errstr = zmq::wsa_error_no (no);\
+ if (errstr != NULL) {\
+ fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \
+ __FILE__, __LINE__);\
+ abort ();\
+ }\
+ } while (false)
+
// Provides convenient way to check GetLastError-style errors on Windows.
#define win_assert(x) \
do {\
diff --git a/src/rep.cpp b/src/rep.cpp
index 46c35cb..dc55ad0 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -78,14 +78,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- zmq_assert (msg_->flags & ZMQ_MSG_MORE);
- // Empty message part delimits the traceback stack.
- bottom = (zmq_msg_size (msg_) == 0);
-
- // Push it to the reply pipe.
- rc = xrep_t::xsend (msg_, flags_);
- zmq_assert (rc == 0);
+ if ((msg_->flags & ZMQ_MSG_MORE)) {
+ // Empty message part delimits the traceback stack.
+ bottom = (zmq_msg_size (msg_) == 0);
+
+ // Push it to the reply pipe.
+ rc = xrep_t::xsend (msg_, flags_);
+ zmq_assert (rc == 0);
+ }
+ else {
+ // If the traceback stack is malformed, discard anything
+ // already sent to pipe (we're at end of invalid message).
+ rc = xrep_t::rollback ();
+ zmq_assert (rc == 0);
+ }
}
request_begins = false;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 4317bb0..24789b8 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -61,6 +61,11 @@
#include "xpub.hpp"
#include "xsub.hpp"
+bool zmq::socket_base_t::check_tag ()
+{
+ return tag == 0xbaddecaf;
+}
+
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
uint32_t tid_)
{
@@ -110,6 +115,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
own_t (parent_, tid_),
+ tag (0xbaddecaf),
ctx_terminated (false),
destroyed (false),
last_tsc (0),
@@ -126,6 +132,9 @@ zmq::socket_base_t::~socket_base_t ()
sessions_sync.lock ();
zmq_assert (sessions.empty ());
sessions_sync.unlock ();
+
+ // Mark the socket as dead.
+ tag = 0xdeadbeef;
}
zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 15ac83c..333cddd 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -50,6 +50,9 @@ namespace zmq
public:
+ // Returns false if object is not a socket.
+ bool check_tag ();
+
// Create a socket of a specified type.
static socket_base_t *create (int type_, class ctx_t *parent_,
uint32_t tid_);
@@ -136,6 +139,9 @@ namespace zmq
private:
+ // Used to check whether the object is a socket.
+ uint32_t tag;
+
// If true, associated context was already terminated.
bool ctx_terminated;
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 0c1581d..71b362b 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -118,11 +118,12 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
// Assert that the error was caused by the networking problems
// rather than 0MQ bug.
- errno = err;
- errno_assert (errno == WSAECONNREFUSED || errno == WSAETIMEDOUT ||
- errno == WSAECONNABORTED || errno == WSAEHOSTUNREACH);
+ if (err == WSAECONNREFUSED || err == WSAETIMEDOUT ||
+ err == WSAECONNABORTED || err == WSAEHOSTUNREACH ||
+ err == WSAENETUNREACH || err == WSAENETDOWN)
+ return retired_fd;
- return retired_fd;
+ wsa_assert_no (err);
}
// Return the newly connected socket.
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 75dc30e..7317056 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -63,7 +63,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
if (terminating) {
register_term_acks (1);
- outpipe_->terminate ();
+ outpipe_->terminate ();
}
}
@@ -102,11 +102,13 @@ void zmq::xrep_t::terminated (reader_t *pipe_)
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
if (it->reader == pipe_) {
+ if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in)
+ current_in--;
inpipes.erase (it);
- if (terminating)
- unregister_term_ack ();
if (current_in >= inpipes.size ())
current_in = 0;
+ if (terminating)
+ unregister_term_ack ();
return;
}
}
@@ -288,6 +290,16 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
+int zmq::xrep_t::rollback (void)
+{
+ if (current_out) {
+ current_out->rollback ();
+ current_out = NULL;
+ more_out = false;
+ }
+ return 0;
+}
+
bool zmq::xrep_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
diff --git a/src/xrep.hpp b/src/xrep.hpp
index d7fbe9f..d1189f4 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -47,6 +47,7 @@ namespace zmq
const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
+ int rollback ();
bool xhas_in ();
bool xhas_out ();
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 61f942d..85f7d62 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -284,7 +284,7 @@ void *zmq_init (int io_threads_)
int zmq_term (void *ctx_)
{
- if (!ctx_) {
+ if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
@@ -310,7 +310,7 @@ int zmq_term (void *ctx_)
void *zmq_socket (void *ctx_, int type_)
{
- if (!ctx_) {
+ if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return NULL;
}
@@ -319,8 +319,8 @@ void *zmq_socket (void *ctx_, int type_)
int zmq_close (void *s_)
{
- if (!s_) {
- errno = EFAULT;
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
return -1;
}
((zmq::socket_base_t*) s_)->close ();
@@ -330,8 +330,8 @@ int zmq_close (void *s_)
int zmq_setsockopt (void *s_, int option_, const void *optval_,
size_t optvallen_)
{
- if (!s_) {
- errno = EFAULT;
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
return -1;
}
return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
@@ -340,8 +340,8 @@ int zmq_setsockopt (void *s_, int option_, const void *optval_,
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
- if (!s_) {
- errno = EFAULT;
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
return -1;
}
return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_,
@@ -350,8 +350,8 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
int zmq_bind (void *s_, const char *addr_)
{
- if (!s_) {
- errno = EFAULT;
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
return -1;
}
return (((zmq::socket_base_t*) s_)->bind (addr_));
@@ -359,8 +359,8 @@ int zmq_bind (void *s_, const char *addr_)
int zmq_connect (void *s_, const char *addr_)
{
- if (!s_) {
- errno = EFAULT;
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
return -1;
}
return (((zmq::socket_base_t*) s_)->connect (addr_));
@@ -368,8 +368,8 @@ int zmq_connect (void *s_, const char *addr_)
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
{
- if (!s_) {
- errno = EFAULT;
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
return -1;
}
return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
@@ -377,8 +377,8 @@ int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
{
- if (!s_) {
- errno = EFAULT;
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
return -1;
}
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));