summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_getsockopt.txt14
-rw-r--r--doc/zmq_setsockopt.txt12
-rw-r--r--include/zmq.h1
-rw-r--r--src/decoder.cpp21
-rw-r--r--src/decoder.hpp5
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp3
-rw-r--r--src/pgm_receiver.cpp3
-rw-r--r--src/zmq_engine.cpp2
9 files changed, 72 insertions, 7 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 6cdbcc3..c93fc7f 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -311,6 +311,20 @@ Default value:: 100
Applicable socket types:: all, only for connection-oriented transports
+ZMQ_MAXMSGSIZE: Maximum acceptable inbound message size
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The options shall retrieve limit for the inbound messages. If a peer sends
+a message larger than ZMQ_MAXMSGSIZE it is disconnected. Value of -1 means
+'no limit'.
+
+[horizontal]
+Option value type:: int64_t
+Option value unit:: bytes
+Default value:: -1
+Applicable socket types:: all
+
+
ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve the file descriptor associated with the
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 5cc14cd..98c20fb 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -319,6 +319,18 @@ Option value unit:: connections
Default value:: 100
Applicable socket types:: all, only for connection-oriented transports.
+ZMQ_MAXMSGSIZE: Maximum acceptable inbound message size
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Limits the size of the inbound message. If a peer sends a message larger than
+ZMQ_MAXMSGSIZE it is disconnected. Value of -1 means 'no limit'.
+
+[horizontal]
+Option value type:: int64_t
+Option value unit:: bytes
+Default value:: -1
+Applicable socket types:: all
+
RETURN VALUE
------------
diff --git a/include/zmq.h b/include/zmq.h
index ac31cba..8c4447e 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -204,6 +204,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_BACKLOG 19
#define ZMQ_RECOVERY_IVL_MSEC 20 /* opt. recovery time, reconcile in 3.x */
#define ZMQ_RECONNECT_IVL_MAX 21
+#define ZMQ_MAXMSGSIZE 22
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
diff --git a/src/decoder.cpp b/src/decoder.cpp
index c8ca715..7fb32ae 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -25,9 +25,10 @@
#include "wire.hpp"
#include "err.hpp"
-zmq::decoder_t::decoder_t (size_t bufsize_) :
+zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <decoder_t> (bufsize_),
- destination (NULL)
+ destination (NULL),
+ maxmsgsize (maxmsgsize_)
{
zmq_msg_init (&in_progress);
@@ -63,7 +64,13 @@ bool zmq::decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
+ int rc;
+ if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) {
+ rc = -1;
+ errno = ENOMEM;
+ }
+ else
+ rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
if (rc != 0 && errno == ENOMEM) {
rc = zmq_msg_init (&in_progress);
errno_assert (rc == 0);
@@ -92,7 +99,13 @@ bool zmq::decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, size - 1);
+ int rc;
+ if (maxmsgsize >= 0 && (int64_t) (size - 1) > maxmsgsize) {
+ rc = -1;
+ errno = ENOMEM;
+ }
+ else
+ rc = zmq_msg_init_size (&in_progress, size - 1);
if (rc != 0 && errno == ENOMEM) {
rc = zmq_msg_init (&in_progress);
errno_assert (rc == 0);
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 6d28801..3b8b460 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -26,6 +26,7 @@
#include <algorithm>
#include "err.hpp"
+#include "stdint.hpp"
#include "../include/zmq.h"
@@ -180,7 +181,7 @@ namespace zmq
{
public:
- decoder_t (size_t bufsize_);
+ decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~decoder_t ();
void set_inout (struct i_inout *destination_);
@@ -196,6 +197,8 @@ namespace zmq
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
+ int64_t maxmsgsize;
+
decoder_t (const decoder_t&);
void operator = (const decoder_t&);
};
diff --git a/src/options.cpp b/src/options.cpp
index c6d5760..9271b88 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -39,6 +39,7 @@ zmq::options_t::options_t () :
reconnect_ivl (100),
reconnect_ivl_max (0),
backlog (100),
+ maxmsgsize (-1),
requires_in (false),
requires_out (false),
immediate_connect (true)
@@ -182,6 +183,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
backlog = *((int*) optval_);
return 0;
+ case ZMQ_MAXMSGSIZE:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ maxmsgsize = *((int64_t*) optval_);
+ return 0;
+
}
errno = EINVAL;
@@ -328,6 +337,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case ZMQ_MAXMSGSIZE:
+ if (*optvallen_ < sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int64_t*) optval_) = maxmsgsize;
+ *optvallen_ = sizeof (int64_t);
+ return 0;
+
}
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 38c6982..00e59c8 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -69,6 +69,9 @@ namespace zmq
// Maximum backlog for pending connections.
int backlog;
+ // Maximal size of message to handle.
+ int64_t maxmsgsize;
+
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 1b96198..1b66cc4 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -211,7 +211,8 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true;
// Create and connect decoder for the peer.
- it->second.decoder = new (std::nothrow) decoder_t (0);
+ it->second.decoder = new (std::nothrow) decoder_t (0,
+ options.maxmsgsize);
alloc_assert (it->second.decoder);
it->second.decoder->set_inout (inout);
}
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index c51b7ad..5c7af3a 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -35,7 +35,7 @@
zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
inpos (NULL),
insize (0),
- decoder (in_batch_size),
+ decoder (in_batch_size, options_.maxmsgsize),
outpos (NULL),
outsize (0),
encoder (out_batch_size),