summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_getsockopt.txt16
-rw-r--r--doc/zmq_setsockopt.txt16
-rw-r--r--include/zmq.h1
-rw-r--r--src/Makefile.am2
-rw-r--r--src/config.hpp4
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp1
-rw-r--r--src/pipe.cpp93
-rw-r--r--src/pipe.hpp30
-rw-r--r--src/session.cpp6
-rw-r--r--src/socket_base.cpp20
-rw-r--r--src/swap.cpp325
-rw-r--r--src/swap.hpp123
13 files changed, 24 insertions, 631 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index eef4b1f..81dcba3 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -79,22 +79,6 @@ Default value:: 0
Applicable socket types:: all
-ZMQ_SWAP: Retrieve disk offload size
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The 'ZMQ_SWAP' option shall retrieve the disk offload (swap) size for the
-specified 'socket'. A socket which has 'ZMQ_SWAP' set to a non-zero value may
-exceed it's high water mark; in this case outstanding messages shall be
-offloaded to storage on disk rather than held in memory.
-
-The value of 'ZMQ_SWAP' defines the maximum size of the swap space in bytes.
-
-[horizontal]
-Option value type:: int64_t
-Option value unit:: bytes
-Default value:: 0
-Applicable socket types:: all
-
-
ZMQ_AFFINITY: Retrieve I/O thread affinity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_AFFINITY' option shall retrieve the I/O thread affinity for newly
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 9387116..5d93a62 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -47,22 +47,6 @@ Default value:: 0
Applicable socket types:: all
-ZMQ_SWAP: Set disk offload size
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The 'ZMQ_SWAP' option shall set the disk offload (swap) size for the specified
-'socket'. A socket which has 'ZMQ_SWAP' set to a non-zero value may exceed it's
-high water mark; in this case outstanding messages shall be offloaded to
-storage on disk rather than held in memory.
-
-The value of 'ZMQ_SWAP' defines the maximum size of the swap space in bytes.
-
-[horizontal]
-Option value type:: int64_t
-Option value unit:: bytes
-Default value:: 0
-Applicable socket types:: all
-
-
ZMQ_AFFINITY: Set I/O thread affinity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_AFFINITY' option shall set the I/O thread affinity for newly created
diff --git a/include/zmq.h b/include/zmq.h
index cd1bc90..aa16554 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -184,7 +184,6 @@ ZMQ_EXPORT int zmq_term (void *context);
/* Socket options. */
#define ZMQ_HWM 1
-#define ZMQ_SWAP 3
#define ZMQ_AFFINITY 4
#define ZMQ_IDENTITY 5
#define ZMQ_SUBSCRIBE 6
diff --git a/src/Makefile.am b/src/Makefile.am
index d0644ef..d2f6d09 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -115,7 +115,6 @@ libzmq_la_SOURCES = \
socket_base.hpp \
stdint.hpp \
sub.hpp \
- swap.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
@@ -173,7 +172,6 @@ libzmq_la_SOURCES = \
session.cpp \
socket_base.cpp \
sub.cpp \
- swap.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
diff --git a/src/config.hpp b/src/config.hpp
index f144512..3df66c7 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -59,10 +59,6 @@ namespace zmq
// Maximal delta between high and low watermark.
max_wm_delta = 1024,
- // Swap inteligently batches data for writing to disk. The size of
- // the batch in bytes is specified by this option.
- swap_block_size = 8192,
-
// Maximum number of events the I/O thread can process in one go.
max_io_events = 256,
diff --git a/src/options.cpp b/src/options.cpp
index 79e4029..9916475 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -27,7 +27,6 @@
zmq::options_t::options_t () :
hwm (0),
- swap (0),
affinity (0),
rate (100),
recovery_ivl (10),
@@ -59,14 +58,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
hwm = *((uint64_t*) optval_);
return 0;
- case ZMQ_SWAP:
- if (optvallen_ != sizeof (int64_t) || *((int64_t*) optval_) < 0) {
- errno = EINVAL;
- return -1;
- }
- swap = *((int64_t*) optval_);
- return 0;
-
case ZMQ_AFFINITY:
if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
@@ -195,15 +186,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (uint64_t);
return 0;
- case ZMQ_SWAP:
- if (*optvallen_ < sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- *((int64_t*) optval_) = swap;
- *optvallen_ = sizeof (int64_t);
- return 0;
-
case ZMQ_AFFINITY:
if (*optvallen_ < sizeof (uint64_t)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 62ecadf..a1f9f33 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -36,7 +36,6 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_);
uint64_t hwm;
- int64_t swap;
uint64_t affinity;
blob_t identity;
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 8dcf577..f09dea4 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -163,7 +163,7 @@ void zmq::reader_t::process_pipe_term_ack ()
}
zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
- uint64_t hwm_, int64_t swap_size_) :
+ uint64_t hwm_) :
object_t (parent_),
active (true),
pipe (pipe_),
@@ -171,28 +171,15 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
hwm (hwm_),
msgs_read (0),
msgs_written (0),
- swap (NULL),
sink (NULL),
- swapping (false),
- pending_delimiter (false),
terminating (false)
{
// Inform reader about the writer.
reader->set_writer (this);
-
- // Open the swap file, if required.
- if (swap_size_ > 0) {
- swap = new (std::nothrow) swap_t (swap_size_);
- alloc_assert (swap);
- int rc = swap->init ();
- zmq_assert (rc == 0);
- }
}
zmq::writer_t::~writer_t ()
{
- if (swap)
- delete swap;
}
void zmq::writer_t::set_event_sink (i_writer_events *sink_)
@@ -208,21 +195,9 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_)
if (unlikely (!active))
return false;
- if (unlikely (swapping)) {
- if (unlikely (!swap->fits (msg_))) {
- active = false;
- return false;
- }
- }
- else {
- if (unlikely (pipe_full ())) {
- if (swap)
- swapping = true;
- else {
- active = false;
- return false;
- }
- }
+ if (unlikely (pipe_full ())) {
+ active = false;
+ return false;
}
return true;
@@ -233,14 +208,6 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
if (unlikely (!check_write (msg_)))
return false;
- if (unlikely (swapping)) {
- bool stored = swap->store (msg_);
- zmq_assert (stored);
- if (!(msg_->flags & ZMQ_MSG_MORE))
- swap->commit ();
- return true;
- }
-
pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_written++;
@@ -250,12 +217,6 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
void zmq::writer_t::rollback ()
{
- // Remove incomplete message from the swap.
- if (unlikely (swapping)) {
- swap->rollback ();
- return;
- }
-
// Remove incomplete message from the pipe.
zmq_msg_t msg;
while (pipe->unwrite (&msg)) {
@@ -266,8 +227,7 @@ void zmq::writer_t::rollback ()
void zmq::writer_t::flush ()
{
- // In the swapping mode, flushing is automatically handled by swap object.
- if (!swapping && !pipe->flush ())
+ if (!pipe->flush ())
send_activate_reader (reader);
}
@@ -284,11 +244,6 @@ void zmq::writer_t::terminate ()
// Rollback any unfinished messages.
rollback ();
- if (swapping) {
- pending_delimiter = true;
- return;
- }
-
// Push delimiter into the pipe. Trick the compiler to belive that
// the tag is a valid pointer. Note that watermarks are not checked
// thus the delimiter can be written even though the pipe is full.
@@ -305,40 +260,6 @@ void zmq::writer_t::process_activate_writer (uint64_t msgs_read_)
// Store the reader's message sequence number.
msgs_read = msgs_read_;
- // If we are in the swapping mode, we have some messages in the swap.
- // Given that pipe is now ready for writing we can move part of the
- // swap into the pipe.
- if (swapping) {
- zmq_msg_t msg;
- while (!pipe_full () && !swap->empty ()) {
- swap->fetch(&msg);
- pipe->write (msg, msg.flags & ZMQ_MSG_MORE);
- if (!(msg.flags & ZMQ_MSG_MORE))
- msgs_written++;
- }
- if (!pipe->flush ())
- send_activate_reader (reader);
-
- // There are no more messages in the swap. We can switch into
- // standard in-memory mode.
- if (swap->empty ()) {
- swapping = false;
-
- // Push delimiter into the pipe. Trick the compiler to belive that
- // the tag is a valid pointer. Note that watermarks are not checked
- // thus the delimiter can be written even though the pipe is full.
- if (pending_delimiter) {
- zmq_msg_t msg;
- const unsigned char *offset = 0;
- msg.content = (void*) (offset + ZMQ_DELIMITER);
- msg.flags = 0;
- pipe->write (msg, false);
- flush ();
- return;
- }
- }
- }
-
// If the writer was non-active before, let's make it active
// (available for writing messages to).
if (!active && !terminating) {
@@ -371,7 +292,7 @@ bool zmq::writer_t::pipe_full ()
}
void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_)
+ uint64_t hwm_, reader_t **reader_, writer_t **writer_)
{
// First compute the low water mark. Following point should be taken
// into consideration:
@@ -404,6 +325,6 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
*reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm);
alloc_assert (*reader_);
*writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_,
- hwm_, swap_size_);
+ hwm_);
alloc_assert (*writer_);
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index b4a0ffa..ed13478 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -26,7 +26,6 @@
#include "stdint.hpp"
#include "array.hpp"
#include "ypipe.hpp"
-#include "swap.hpp"
#include "config.hpp"
#include "object.hpp"
@@ -35,7 +34,7 @@ namespace zmq
// Creates a pipe. Returns pointer to reader and writer objects.
void create_pipe (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_, int64_t swap_size_, class reader_t **reader_,
+ uint64_t hwm_, class reader_t **reader_,
class writer_t **writer_);
// The shutdown mechanism for pipe works as follows: Either endpoint
@@ -59,7 +58,7 @@ namespace zmq
class reader_t : public object_t, public array_item_t
{
friend void create_pipe (object_t*, object_t*, uint64_t,
- int64_t, reader_t**, writer_t**);
+ reader_t**, writer_t**);
friend class writer_t;
public:
@@ -128,7 +127,7 @@ namespace zmq
class writer_t : public object_t, public array_item_t
{
friend void create_pipe (object_t*, object_t*, uint64_t,
- int64_t, reader_t**, writer_t**);
+ reader_t**, writer_t**);
public:
@@ -136,8 +135,8 @@ namespace zmq
void set_event_sink (i_writer_events *endpoint_);
// Checks whether messages can be written to the pipe.
- // If writing the message would cause high watermark and (optionally)
- // if the swap is full, the function returns false.
+ // If writing the message would cause high watermark
+ // the function returns false.
bool check_write (zmq_msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the
@@ -156,19 +155,17 @@ namespace zmq
private:
writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_,
- uint64_t hwm_, int64_t swap_size_);
+ uint64_t hwm_);
~writer_t ();
// Command handlers.
void process_activate_writer (uint64_t msgs_read_);
void process_pipe_term ();
- // Tests whether underlying pipe is already full. The swap is not
- // taken into account.
+ // Tests whether underlying pipe is already full.
bool pipe_full ();
- // True, if this object can be written to. Undelying ypipe may be full
- // but as long as there's swap space available, this flag is true.
+ // True, if this object can be written to.
bool active;
// The underlying pipe.
@@ -187,20 +184,9 @@ namespace zmq
// Number of messages we have written so far.
uint64_t msgs_written;
- // Pointer to the message swap. If NULL, messages are always
- // kept in main memory.
- swap_t *swap;
-
// Sink for the events (either the socket or the session).
i_writer_events *sink;
- // If true, swap is active. New messages are to be written to the swap.
- bool swapping;
-
- // If true, there's a delimiter to be written to the pipe after the
- // swap is empied.
- bool pending_delimiter;
-
// True is 'terminate' method was called of 'pipe_term' command
// arrived from the reader.
bool terminating;
diff --git a/src/session.cpp b/src/session.cpp
index 3ba971f..176f0ef 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -247,13 +247,11 @@ void zmq::session_t::process_attach (i_engine *engine_,
// Create the pipes, as required.
if (options.requires_in) {
- create_pipe (socket, this, options.hwm, options.swap,
- &socket_reader, &out_pipe);
+ create_pipe (socket, this, options.hwm, &socket_reader, &out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out) {
- create_pipe (this, socket, options.hwm, options.swap, &in_pipe,
- &socket_writer);
+ create_pipe (this, socket, options.hwm, &in_pipe, &socket_writer);
in_pipe->set_event_sink (this);
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index abeddb9..093bbe0 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -376,28 +376,22 @@ int zmq::socket_base_t::connect (const char *addr_)
writer_t *outpipe_writer = NULL;
// The total HWM for an inproc connection should be the sum of
- // the binder's HWM and the connector's HWM. (Similarly for the
- // SWAP.)
+ // the binder's HWM and the connector's HWM.
int64_t hwm;
if (options.hwm == 0 || peer.options.hwm == 0)
hwm = 0;
else
hwm = options.hwm + peer.options.hwm;
- int64_t swap;
- if (options.swap == 0 && peer.options.swap == 0)
- swap = 0;
- else
- swap = options.swap + peer.options.swap;
// Create inbound pipe, if required.
if (options.requires_in)
- create_pipe (this, peer.socket, hwm, swap,
- &inpipe_reader, &inpipe_writer);
+ create_pipe (this, peer.socket, hwm, &inpipe_reader,
+ &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
- create_pipe (peer.socket, this, hwm, swap,
- &outpipe_reader, &outpipe_writer);
+ create_pipe (peer.socket, this, hwm, &outpipe_reader,
+ &outpipe_writer);
// Attach the pipes to this socket object.
attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity);
@@ -435,12 +429,12 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in)
- create_pipe (this, session, options.hwm, options.swap,
+ create_pipe (this, session, options.hwm,
&inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
- create_pipe (session, this, options.hwm, options.swap,
+ create_pipe (session, this, options.hwm,
&outpipe_reader, &outpipe_writer);
// Attach the pipes to the socket object.
diff --git a/src/swap.cpp b/src/swap.cpp
deleted file mode 100644
index 936f30e..0000000
--- a/src/swap.cpp
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "platform.hpp"
-
-#ifdef ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#include <io.h>
-#else
-#include <unistd.h>
-#endif
-
-#include "../include/zmq.h"
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <string.h>
-#include <sstream>
-#include <algorithm>
-
-#include "swap.hpp"
-#include "config.hpp"
-#include "atomic_counter.hpp"
-#include "err.hpp"
-
-zmq::swap_t::swap_t (int64_t filesize_) :
- fd (-1),
- filesize (filesize_),
- file_pos (0),
- write_pos (0),
- read_pos (0),
- block_size (swap_block_size),
- write_buf_start_addr (0)
-{
- zmq_assert (filesize > 0);
- zmq_assert (block_size > 0);
-
- buf1 = new (std::nothrow) char [block_size];
- alloc_assert (buf1);
-
- buf2 = new (std::nothrow) char [block_size];
- alloc_assert (buf2);
-
- read_buf = write_buf = buf1;
-}
-
-zmq::swap_t::~swap_t ()
-{
- delete [] buf1;
- delete [] buf2;
-
- if (fd == -1)
- return;
-
-#ifdef ZMQ_HAVE_WINDOWS
- int rc = _close (fd);
-#else
- int rc = close (fd);
-#endif
- errno_assert (rc == 0);
-
-#ifdef ZMQ_HAVE_WINDOWS
- rc = _unlink (filename.c_str ());
-#else
- rc = unlink (filename.c_str ());
-#endif
- errno_assert (rc == 0);
-}
-
-int zmq::swap_t::init ()
-{
- static zmq::atomic_counter_t seqnum (0);
-
- // Get process ID.
-#ifdef ZMQ_HAVE_WINDOWS
- int pid = GetCurrentThreadId ();
-#else
- pid_t pid = getpid ();
-#endif
-
- std::ostringstream outs;
- outs << "zmq_" << pid << '_' << seqnum.get () << ".swap";
- filename = outs.str ();
-
- seqnum.add (1);
-
- // Open the backing file.
-#ifdef ZMQ_HAVE_WINDOWS
- fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600);
-#else
- fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600);
-#endif
- if (fd == -1)
- return -1;
-
-#ifdef ZMQ_HAVE_LINUX
- // Enable more aggresive read-ahead optimization.
- posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL);
-#endif
- return 0;
-}
-
-bool zmq::swap_t::store (zmq_msg_t *msg_)
-{
- size_t msg_size = zmq_msg_size (msg_);
-
- // Check buffer space availability.
- // NOTE: We always keep one byte open.
- if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
- return false;
-
- // Don't store the ZMQ_MSG_SHARED flag.
- uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED;
-
- // Write message length, flags, and message body.
- copy_to_file (&msg_size, sizeof msg_size);
- copy_to_file (&msg_flags, sizeof msg_flags);
- copy_to_file (zmq_msg_data (msg_), msg_size);
-
- zmq_msg_close (msg_);
-
- return true;
-}
-
-void zmq::swap_t::fetch (zmq_msg_t *msg_)
-{
- // There must be at least one message available.
- zmq_assert (read_pos != write_pos);
-
- // Retrieve the message size.
- size_t msg_size;
- copy_from_file (&msg_size, sizeof msg_size);
-
- // Initialize the message.
- zmq_msg_init_size (msg_, msg_size);
-
- // Retrieve the message flags.
- copy_from_file (&msg_->flags, sizeof msg_->flags);
-
- // Retrieve the message payload.
- copy_from_file (zmq_msg_data (msg_), msg_size);
-}
-
-void zmq::swap_t::commit ()
-{
- commit_pos = write_pos;
-}
-
-void zmq::swap_t::rollback ()
-{
- if (commit_pos == write_pos || read_pos == write_pos)
- return;
-
- if (write_pos > read_pos)
- zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos);
- else
- zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos);
-
- if (commit_pos / block_size == read_pos / block_size) {
- write_buf_start_addr = commit_pos % block_size;
- write_buf = read_buf;
- }
- else if (commit_pos / block_size != write_pos / block_size) {
- write_buf_start_addr = commit_pos % block_size;
- fill_buf (write_buf, write_buf_start_addr);
- }
- write_pos = commit_pos;
-}
-
-bool zmq::swap_t::empty ()
-{
- return read_pos == write_pos;
-}
-
-/*
-bool zmq::swap_t::full ()
-{
- // Check that at least the message size can be written to the swap.
- return buffer_space () < (int64_t) (sizeof (size_t) + 1);
-}
-*/
-
-bool zmq::swap_t::fits (zmq_msg_t *msg_)
-{
- // Check whether whole binary representation of the message
- // fits into the swap.
- size_t msg_size = zmq_msg_size (msg_);
- if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
- return false;
- return true;
- }
-
-void zmq::swap_t::copy_from_file (void *buffer_, size_t count_)
-{
- char *dest_ptr = (char *) buffer_;
- size_t chunk_size, remainder = count_;
-
- while (remainder > 0) {
- chunk_size = std::min (remainder,
- std::min ((size_t) (filesize - read_pos),
- (size_t) (block_size - read_pos % block_size)));
-
- memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size);
- dest_ptr += chunk_size;
-
- read_pos = (read_pos + chunk_size) % filesize;
- if (read_pos % block_size == 0) {
- if (read_pos / block_size == write_pos / block_size)
- read_buf = write_buf;
- else
- fill_buf (read_buf, read_pos);
- }
- remainder -= chunk_size;
- }
-}
-
-void zmq::swap_t::copy_to_file (const void *buffer_, size_t count_)
-{
- char *source_ptr = (char *) buffer_;
- size_t chunk_size, remainder = count_;
-
- while (remainder > 0) {
- chunk_size = std::min (remainder,
- std::min ((size_t) (filesize - write_pos),
- (size_t) (block_size - write_pos % block_size)));
-
- memcpy (&write_buf [write_pos % block_size], source_ptr, chunk_size);
- source_ptr += chunk_size;
-
- write_pos = (write_pos + chunk_size) % filesize;
- if (write_pos % block_size == 0) {
- save_write_buf ();
- write_buf_start_addr = write_pos;
-
- if (write_buf == read_buf) {
- if (read_buf == buf2)
- write_buf = buf1;
- else
- write_buf = buf2;
- }
- }
- remainder -= chunk_size;
- }
-}
-
-void zmq::swap_t::fill_buf (char *buf, int64_t pos)
-{
- if (file_pos != pos) {
-#ifdef ZMQ_HAVE_WINDOWS
- __int64 offset = _lseeki64 (fd, pos, SEEK_SET);
-#else
- off_t offset = lseek (fd, (off_t) pos, SEEK_SET);
-#endif
- errno_assert (offset == pos);
- file_pos = pos;
- }
- size_t octets_stored = 0;
- size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos));
-
- while (octets_stored < octets_total) {
-#ifdef ZMQ_HAVE_WINDOWS
- int rc = _read (fd, &buf [octets_stored], octets_total - octets_stored);
-#else
- ssize_t rc = read (fd, &buf [octets_stored],
- octets_total - octets_stored);
-#endif
- errno_assert (rc > 0);
- octets_stored += rc;
- }
- file_pos += octets_total;
-}
-
-void zmq::swap_t::save_write_buf ()
-{
- if (file_pos != write_buf_start_addr) {
-#ifdef ZMQ_HAVE_WINDOWS
- __int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET);
-#else
- off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET);
-#endif
- errno_assert (offset == write_buf_start_addr);
- file_pos = write_buf_start_addr;
- }
- size_t octets_stored = 0;
- size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos));
-
- while (octets_stored < octets_total) {
-#ifdef ZMQ_HAVE_WINDOWS
- int rc = _write (fd, &write_buf [octets_stored],
- octets_total - octets_stored);
-#else
- ssize_t rc = write (fd, &write_buf [octets_stored],
- octets_total - octets_stored);
-#endif
- errno_assert (rc > 0);
- octets_stored += rc;
- }
- file_pos += octets_total;
-}
-
-int64_t zmq::swap_t::buffer_space ()
-{
- if (write_pos < read_pos)
- return read_pos - write_pos;
-
- return filesize - (write_pos - read_pos);
-}
diff --git a/src/swap.hpp b/src/swap.hpp
deleted file mode 100644
index ad2bcc3..0000000
--- a/src/swap.hpp
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_SWAP_HPP_INCLUDED__
-#define __ZMQ_SWAP_HPP_INCLUDED__
-
-#include "../include/zmq.h"
-
-#include <string>
-#include "stdint.hpp"
-
-namespace zmq
-{
-
- // This class implements a message swap. Messages are retrieved from
- // the swap in the same order as they entered it.
-
- class swap_t
- {
- public:
-
- enum { default_block_size = 8192 };
-
- // Creates the swap.
- swap_t (int64_t filesize_);
-
- ~swap_t ();
-
- int init ();
-
- // Stores the message into the swap. The function
- // returns false if the swap is full; true otherwise.
- bool store (zmq_msg_t *msg_);
-
- // Fetches the oldest message from the swap. It is an error
- // to call this function when the swap is empty.
- void fetch (zmq_msg_t *msg_);
-
- void commit ();
-
- void rollback ();
-
- // Returns true if the swap is empty; false otherwise.
- bool empty ();
-
-
-// // Returns true if and only if the swap is full.
-// bool full ();
-
- // Returns true if the message fits into swap.
- bool fits (zmq_msg_t *msg_);
-
- private:
-
- // Copies data from a memory buffer to the backing file.
- // Wraps around when reaching maximum file size.
- void copy_from_file (void *buffer_, size_t count_);
-
- // Copies data from the backing file to the memory buffer.
- // Wraps around when reaching end-of-file.
- void copy_to_file (const void *buffer_, size_t count_);
-
- // Returns the buffer space available.
- int64_t buffer_space ();
-
- void fill_buf (char *buf, int64_t pos);
-
- void save_write_buf ();
-
- // File descriptor to the backing file.
- int fd;
-
- // Name of the backing file.
- std::string filename;
-
- // Maximum size of the backing file.
- int64_t filesize;
-
- // File offset associated with the fd file descriptor.
- int64_t file_pos;
-
- // File offset the next message will be stored at.
- int64_t write_pos;
-
- // File offset the next message will be read from.
- int64_t read_pos;
-
- int64_t commit_pos;
-
- size_t block_size;
-
- char *buf1;
- char *buf2;
- char *read_buf;
- char *write_buf;
-
- int64_t write_buf_start_addr;
-
- // Disable copying of the swap object.
- swap_t (const swap_t&);
- const swap_t &operator = (const swap_t&);
- };
-
-}
-
-#endif