summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp50
1 files changed, 43 insertions, 7 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3b74359..b186683 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -37,6 +37,7 @@
#include "platform.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
+#include "likely.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
@@ -58,6 +59,11 @@ zmq::socket_base_t::~socket_base_t ()
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
+ if (unlikely (app_thread->is_terminated ())) {
+ errno = ETERM;
+ return -1;
+ }
+
// First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_);
if (rc == 0 || errno != EINVAL)
@@ -71,6 +77,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
+ if (unlikely (app_thread->is_terminated ())) {
+ errno = ETERM;
+ return -1;
+ }
+
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL;
@@ -86,6 +97,11 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
int zmq::socket_base_t::bind (const char *addr_)
{
+ if (unlikely (app_thread->is_terminated ())) {
+ errno = ETERM;
+ return -1;
+ }
+
// Parse addr_ string.
std::string addr_type;
std::string addr_args;
@@ -141,6 +157,11 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
+ if (unlikely (app_thread->is_terminated ())) {
+ errno = ETERM;
+ return -1;
+ }
+
// Parse addr_ string.
std::string addr_type;
std::string addr_args;
@@ -328,13 +349,16 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
+ // Process pending commands, if any.
+ if (unlikely (!app_thread->process_commands (false, true))) {
+ errno = ETERM;
+ return -1;
+ }
+
// At this point we impose the MORE flag on the message.
if (flags_ & ZMQ_SNDMORE)
msg_->flags |= ZMQ_MSG_MORE;
- // Process pending commands, if any.
- app_thread->process_commands (false, true);
-
// Try to send the message.
int rc = xsend (msg_, flags_);
if (rc == 0)
@@ -350,7 +374,10 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
- app_thread->process_commands (true, false);
+ if (unlikely (!app_thread->process_commands (true, false))) {
+ errno = ETERM;
+ return -1;
+ }
rc = xsend (msg_, flags_);
}
return 0;
@@ -371,7 +398,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing rdtsc all the time.
if (++ticks == inbound_poll_rate) {
- app_thread->process_commands (false, false);
+ if (unlikely (!app_thread->process_commands (false, false))) {
+ errno = ETERM;
+ return -1;
+ }
ticks = 0;
}
@@ -392,7 +422,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN)
return -1;
- app_thread->process_commands (false, false);
+ if (unlikely (!app_thread->process_commands (false, false))) {
+ errno = ETERM;
+ return -1;
+ }
ticks = 0;
return xrecv (msg_, flags_);
}
@@ -402,7 +435,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
- app_thread->process_commands (true, false);
+ if (unlikely (!app_thread->process_commands (true, false))) {
+ errno = ETERM;
+ return -1;
+ }
rc = xrecv (msg_, flags_);
ticks = 0;
}