summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/app_thread.cpp23
-rw-r--r--src/dispatcher.cpp12
-rw-r--r--src/epoll.cpp3
-rw-r--r--src/io_thread.cpp4
-rw-r--r--src/kqueue.cpp3
-rw-r--r--src/pgm_receiver.cpp4
-rw-r--r--src/pgm_socket.cpp2
-rw-r--r--src/session.cpp8
-rw-r--r--src/socket_base.cpp33
-rw-r--r--src/tcp_listener.cpp1
-rw-r--r--src/yqueue.hpp5
-rw-r--r--src/zmq.cpp4
-rw-r--r--src/zmq_connecter.cpp6
-rw-r--r--src/zmq_connecter_init.cpp5
-rw-r--r--src/zmq_engine.cpp4
-rw-r--r--src/zmq_listener.cpp6
-rw-r--r--src/zmq_listener_init.cpp9
17 files changed, 85 insertions, 47 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 308fc36..666116f 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -17,6 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
#include <algorithm>
#include "../bindings/c/zmq.h"
@@ -65,11 +66,11 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
last_processing_time (0)
{
if (flags_ & ZMQ_POLL) {
- signaler = new fd_signaler_t;
+ signaler = new (std::nothrow) fd_signaler_t;
zmq_assert (signaler);
}
else {
- signaler = new ypollset_t;
+ signaler = new (std::nothrow) ypollset_t;
zmq_assert (signaler);
}
}
@@ -163,31 +164,31 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
socket_base_t *s = NULL;
switch (type_) {
case ZMQ_P2P:
- s = new p2p_t (this);
+ s = new (std::nothrow) p2p_t (this);
break;
case ZMQ_PUB:
- s = new pub_t (this);
+ s = new (std::nothrow) pub_t (this);
break;
case ZMQ_SUB:
- s = new sub_t (this);
+ s = new (std::nothrow) sub_t (this);
break;
case ZMQ_REQ:
- s = new req_t (this);
+ s = new (std::nothrow) req_t (this);
break;
case ZMQ_REP:
- s = new rep_t (this);
+ s = new (std::nothrow) rep_t (this);
break;
case ZMQ_XREQ:
- s = new xreq_t (this);
+ s = new (std::nothrow) xreq_t (this);
break;
case ZMQ_XREP:
- s = new xrep_t (this);
+ s = new (std::nothrow) xrep_t (this);
break;
case ZMQ_UPSTREAM:
- s = new upstream_t (this);
+ s = new (std::nothrow) upstream_t (this);
break;
case ZMQ_DOWNSTREAM:
- s = new downstream_t (this);
+ s = new (std::nothrow) downstream_t (this);
break;
default:
// TODO: This should be EINVAL.
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 51143b3..7115bca 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "../bindings/c/zmq.h"
#include "dispatcher.hpp"
@@ -49,7 +51,8 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create application thread proxies.
for (int i = 0; i != app_threads_; i++) {
- app_thread_t *app_thread = new app_thread_t (this, i, flags_);
+ app_thread_t *app_thread = new (std::nothrow) app_thread_t (this, i,
+ flags_);
zmq_assert (app_thread);
app_threads.push_back (app_thread);
signalers.push_back (app_thread->get_signaler ());
@@ -57,15 +60,16 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create I/O thread objects.
for (int i = 0; i != io_threads_; i++) {
- io_thread_t *io_thread = new io_thread_t (this, i + app_threads_,
- flags_);
+ io_thread_t *io_thread = new (std::nothrow) io_thread_t (this,
+ i + app_threads_, flags_);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
signalers.push_back (io_thread->get_signaler ());
}
// Create command pipe matrix.
- command_pipes = new command_pipe_t [signalers.size () * signalers.size ()];
+ command_pipes = new (std::nothrow) command_pipe_t [signalers.size () *
+ signalers.size ()];
zmq_assert (command_pipes);
// Launch I/O threads.
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 8f576f8..b25883f 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -26,6 +26,7 @@
#include <string.h>
#include <unistd.h>
#include <algorithm>
+#include <new>
#include "epoll.hpp"
#include "err.hpp"
@@ -54,7 +55,7 @@ zmq::epoll_t::~epoll_t ()
zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
{
- poll_entry_t *pe = new poll_entry_t;
+ poll_entry_t *pe = new (std::nothrow) poll_entry_t;
zmq_assert (pe != NULL);
// The memset is not actually needed. It's here to prevent debugging
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 6d4710a..1332795 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "../bindings/c/zmq.h"
#include "io_thread.hpp"
@@ -31,7 +33,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int flags_) :
object_t (dispatcher_, thread_slot_)
{
- poller = new poller_t;
+ poller = new (std::nothrow) poller_t;
zmq_assert (poller);
signaler_handle = poller->add_fd (signaler.get_fd (), this);
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index 69ad0c8..8e33bdb 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -27,6 +27,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <algorithm>
+#include <new>
#include "kqueue.hpp"
#include "err.hpp"
@@ -72,7 +73,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
i_poll_events *reactor_)
{
- poll_entry_t *pe = new poll_entry_t;
+ poll_entry_t *pe = new (std::nothrow) poll_entry_t;
zmq_assert (pe != NULL);
pe->fd = fd_;
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index e3f7996..2a24858 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -21,6 +21,8 @@
#if defined ZMQ_HAVE_OPENPGM
+#include <new>
+
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif
@@ -171,7 +173,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true;
// Create and connect decoder for joined peer.
- it->second.decoder = new zmq_decoder_t (0, NULL, 0);
+ it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
it->second.decoder->set_inout (inout);
}
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index a16acd6..66f2396 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -86,6 +86,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
// in_batch_size configured in confing.hpp
if (receiver) {
pgm_msgv_len = get_max_apdu_at_once (in_batch_size);
+ // TODO: use malloc instead of new
pgm_msgv = new pgm_msgv_t [pgm_msgv_len];
}
@@ -443,6 +444,7 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)
*size_ = get_max_tsdu_size ();
// Allocate buffer.
+ // TODO: use malloc instead of new
unsigned char *apdu_buff = new unsigned char [*size_];
zmq_assert (apdu_buff);
return apdu_buff;
diff --git a/src/session.cpp b/src/session.cpp
index cbcc883..37f2720 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "session.hpp"
#include "i_engine.hpp"
#include "err.hpp"
@@ -157,14 +159,16 @@ void zmq::session_t::process_plug ()
pipe_t *outbound = NULL;
if (options.requires_out) {
- inbound = new pipe_t (this, owner, options.hwm, options.lwm);
+ inbound = new (std::nothrow) pipe_t (this, owner,
+ options.hwm, options.lwm);
zmq_assert (inbound);
in_pipe = &inbound->reader;
in_pipe->set_endpoint (this);
}
if (options.requires_in) {
- outbound = new pipe_t (owner, this, options.hwm, options.lwm);
+ outbound = new (std::nothrow) pipe_t (owner, this,
+ options.hwm, options.lwm);
zmq_assert (outbound);
out_pipe = &outbound->writer;
out_pipe->set_endpoint (this);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 888b6ea..fde258c 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -17,6 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
#include <string>
#include <algorithm>
@@ -87,8 +88,9 @@ int zmq::socket_base_t::bind (const char *addr_)
return register_endpoint (addr_args.c_str (), this);
if (addr_type == "tcp") {
- zmq_listener_t *listener = new zmq_listener_t (
+ zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
choose_io_thread (options.affinity), this, options);
+ zmq_assert (listener);
int rc = listener->set_address (addr_args.c_str ());
if (rc != 0)
return -1;
@@ -143,13 +145,15 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new pipe_t (this, peer, options.hwm, options.lwm);
+ in_pipe = new (std::nothrow) pipe_t (this, peer,
+ options.hwm, options.lwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new pipe_t (peer, this, options.hwm, options.lwm);
+ out_pipe = new (std::nothrow) pipe_t (peer, this,
+ options.hwm, options.lwm);
zmq_assert (out_pipe);
}
@@ -168,8 +172,8 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session_t *session = new session_t (io_thread, this, session_name.c_str (),
- options, true);
+ session_t *session = new (std::nothrow) session_t (io_thread, this,
+ session_name.c_str (), options, true);
zmq_assert (session);
pipe_t *in_pipe = NULL;
@@ -177,14 +181,16 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
+ in_pipe = new (std::nothrow) pipe_t (this, session,
+ options.hwm, options.lwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
+ out_pipe = new (std::nothrow) pipe_t (session, this,
+ options.hwm, options.lwm);
zmq_assert (out_pipe);
}
@@ -205,9 +211,10 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the connecter object. Supply it with the session name
// so that it can bind the new connection to the session once
// it is established.
- zmq_connecter_t *connecter = new zmq_connecter_t (
+ zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
session_name.c_str (), false);
+ zmq_assert (connecter);
int rc = connecter->set_address (addr_args.c_str ());
if (rc != 0) {
delete connecter;
@@ -237,9 +244,10 @@ int zmq::socket_base_t::connect (const char *addr_)
if (options.requires_out) {
// PGM sender.
- pgm_sender_t *pgm_sender =
- new pgm_sender_t (choose_io_thread (options.affinity), options,
+ pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
+ choose_io_thread (options.affinity), options,
session_name.c_str ());
+ zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
if (rc != 0) {
@@ -252,9 +260,10 @@ int zmq::socket_base_t::connect (const char *addr_)
else if (options.requires_in) {
// PGM receiver.
- pgm_receiver_t *pgm_receiver =
- new pgm_receiver_t (choose_io_thread (options.affinity), options,
+ pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
+ choose_io_thread (options.affinity), options,
session_name.c_str ());
+ zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
if (rc != 0) {
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 383aebe..11707fb 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -66,7 +66,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
- // TODO: Convert error code to errno.
if (rc == SOCKET_ERROR) {
wsa_error_to_errno ();
return -1;
diff --git a/src/yqueue.hpp b/src/yqueue.hpp
index f80af9c..1d3f8e8 100644
--- a/src/yqueue.hpp
+++ b/src/yqueue.hpp
@@ -20,6 +20,7 @@
#ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
#define __ZMQ_YQUEUE_HPP_INCLUDED__
+#include <new>
#include <stddef.h>
#include "err.hpp"
@@ -47,7 +48,7 @@ namespace zmq
// Create the queue.
inline yqueue_t ()
{
- begin_chunk = new chunk_t;
+ begin_chunk = new (std::nothrow) chunk_t;
zmq_assert (begin_chunk);
begin_pos = 0;
back_chunk = NULL;
@@ -93,7 +94,7 @@ namespace zmq
if (++end_pos != N)
return;
- end_chunk->next = new chunk_t;
+ end_chunk->next = new (std::nothrow) chunk_t;
zmq_assert (end_chunk->next);
end_chunk = end_chunk->next;
end_pos = 0;
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 3497fe1..d523036 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -208,8 +208,8 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
return NULL;
}
- zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_,
- io_threads_, flags_);
+ zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
+ app_threads_, io_threads_, flags_);
zmq_assert (dispatcher);
return (void*) dispatcher;
}
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index cd7d1b5..5bda48d 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "zmq_connecter.hpp"
#include "zmq_connecter_init.hpp"
#include "io_thread.hpp"
@@ -87,8 +89,8 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_connecter_init_t *init = new zmq_connecter_init_t (io_thread, owner,
- fd, options, session_name.c_str (), address.c_str ());
+ zmq_connecter_init_t *init = new (std::nothrow) zmq_connecter_init_t (
+ io_thread, owner, fd, options, session_name.c_str (), address.c_str ());
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
index 1418429..f8436a3 100644
--- a/src/zmq_connecter_init.cpp
+++ b/src/zmq_connecter_init.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "zmq_connecter_init.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
@@ -31,7 +33,8 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
session_name (session_name_)
{
// Create associated engine object.
- engine = new zmq_engine_t (parent_, fd_, options, true, address_);
+ engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, true,
+ address_);
zmq_assert (engine);
}
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 6b439f5..c2878a7 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "zmq_engine.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
@@ -155,7 +157,7 @@ void zmq::zmq_engine_t::error ()
// Create a connecter object to attempt reconnect.
// Ask it to wait for a while before reconnecting.
- reconnecter = new zmq_connecter_t (
+ reconnecter = new (std::nothrow) zmq_connecter_t (
inout->get_io_thread (), inout->get_owner (),
options, inout->get_session_name (), true);
zmq_assert (reconnecter);
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 5d678a2..5c7552b 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "zmq_listener.hpp"
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
@@ -62,8 +64,8 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_listener_init_t *init = new zmq_listener_init_t (io_thread, owner,
- fd, options);
+ zmq_listener_init_t *init = new (std::nothrow) zmq_listener_init_t (
+ io_thread, owner, fd, options);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index a463dc8..97ea477 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "session.hpp"
@@ -29,7 +31,8 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
has_peer_identity (false)
{
// Create associated engine object.
- engine = new zmq_engine_t (parent_, fd_, options, false, NULL);
+ engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
+ false, NULL);
zmq_assert (engine);
}
@@ -74,8 +77,8 @@ void zmq::zmq_listener_init_t::flush ()
session = owner->find_session (peer_identity.c_str ());
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session = new session_t (io_thread, owner, peer_identity.c_str (),
- options, false);
+ session = new (std::nothrow) session_t (io_thread, owner,
+ peer_identity.c_str (), options, false);
zmq_assert (session);
send_plug (session);
send_own (owner, session);