summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.hpp83
1 files changed, 73 insertions, 10 deletions
diff --git a/include/zmq.hpp b/include/zmq.hpp
index 317fddf..ccc234d 100644
--- a/include/zmq.hpp
+++ b/include/zmq.hpp
@@ -23,6 +23,8 @@
#include "zmq.h"
#include <assert.h>
+#include <errno.h>
+#include <exception>
namespace zmq
{
@@ -36,6 +38,38 @@ namespace zmq
message_delimiter = 1 << ZMQ_DELIMITER
};
+ class no_memory : public exception
+ {
+ virtual const char *what ()
+ {
+ return "Out of memory";
+ }
+ };
+
+ class invalid_argument : public exception
+ {
+ virtual const char *what ()
+ {
+ return "Invalid argument";
+ }
+ };
+
+ class too_many_threads : public exception
+ {
+ virtual const char *what ()
+ {
+ return "Too many threads";
+ }
+ };
+
+ class address_in_use : public exception
+ {
+ virtual const char *what ()
+ {
+ return "Address in use";
+ }
+ };
+
// A message. Caution: Don't change the body of the message once you've
// copied it - the behaviour is undefined. Don't change the body of the
// received message either - other threads may be accessing it in parallel.
@@ -50,7 +84,10 @@ namespace zmq
inline message_t (size_t size_ = 0)
{
int rc = zmq_msg_init_size (this, size_);
- assert (rc == 0);
+ if (rc == -1) {
+ assert (errno == ENOMEM);
+ throw no_memory ();
+ }
}
// Creates message from the supplied buffer. 0MQ takes care of
@@ -79,7 +116,10 @@ namespace zmq
int rc = zmq_msg_close (this);
assert (rc == 0);
rc = zmq_msg_init_size (this, size_);
- assert (rc == 0);
+ if (rc == -1) {
+ assert (errno == ENOMEM);
+ throw no_memory ();
+ }
}
// Same as above, however, the message is rebuilt from the supplied
@@ -147,7 +187,10 @@ namespace zmq
inline context_t (int app_threads_, int io_threads_)
{
ptr = zmq_init (app_threads_, io_threads_);
- assert (ptr);
+ if (ptr == NULL) {
+ assert (errno == EINVAL);
+ throw invalid_argument ();
+ }
}
inline ~context_t ()
@@ -172,7 +215,13 @@ namespace zmq
inline socket_t (context_t &context_, int type_ = 0)
{
ptr = zmq_socket (context_.ptr, type_);
- assert (ptr);
+ if (ptr == NULL) {
+ assert (errno == EMFILE || errno == EINVAL);
+ if (errno == EMFILE)
+ throw too_many_threads ();
+ else
+ throw invalid_argument ();
+ }
}
inline ~socket_t ()
@@ -184,13 +233,25 @@ namespace zmq
inline void bind (const char *addr_, zmq_opts *opts_ = NULL)
{
int rc = zmq_bind (ptr, addr_, opts_);
- assert (rc == 0);
+ if (rc == -1) {
+ assert (errno == EINVAL || errno == EADDRINUSE);
+ if (errno == EINVAL)
+ throw invalid_argument ();
+ else
+ throw address_in_use ();
+ }
}
inline void connect (const char *addr_, zmq_opts *opts_ = NULL)
{
int rc = zmq_connect (ptr, addr_, opts_);
- assert (rc == 0);
+ if (rc == -1) {
+ assert (errno == EINVAL || errno == EADDRINUSE);
+ if (errno == EINVAL)
+ throw invalid_argument ();
+ else
+ throw address_in_use ();
+ }
}
inline void subscribe (const char *criteria_)
@@ -199,10 +260,11 @@ namespace zmq
assert (rc == 0);
}
- inline void send (message_t &msg_, int flags_ = 0)
+ inline int send (message_t &msg_, int flags_ = 0)
{
int rc = zmq_send (ptr, &msg_, flags_);
- assert (rc == 0);
+ assert (rc == 0 || (rc == -1 && errno == EAGAIN));
+ return rc;
}
inline void flush ()
@@ -211,10 +273,11 @@ namespace zmq
assert (rc == 0);
}
- inline void recv (message_t *msg_, int flags_ = 0)
+ inline int recv (message_t *msg_, int flags_ = 0)
{
int rc = zmq_recv (ptr, msg_, flags_);
- assert (rc == 0);
+ assert (rc == 0 || (rc == -1 && errno == EAGAIN));
+ return rc;
}
private: