summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/fd_signaler.cpp2
-rw-r--r--src/tcp_connecter.cpp18
-rw-r--r--src/tcp_listener.cpp89
-rw-r--r--src/tcp_socket.cpp5
-rw-r--r--src/zmq.cpp67
5 files changed, 177 insertions, 4 deletions
diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp
index 3f433b8..862b0fd 100644
--- a/src/fd_signaler.cpp
+++ b/src/fd_signaler.cpp
@@ -151,7 +151,7 @@ zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check ()
signals_t signals = 0;
for (int pos = 0; pos != nbytes; pos++) {
zmq_assert (buffer [pos] < 64);
- signals |= (1 << (buffer [pos]));
+ signals |= (signals_t (1) << (buffer [pos]));
}
return signals;
}
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index fa99538..22caafb 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -98,7 +98,23 @@ zmq::fd_t zmq::tcp_connecter_t::get_fd ()
return s;
}
-// connect
+zmq::fd_t zmq::tcp_connecter_t::connect ()
+{
+ // Nonblocking connect have finished. Check whether an error occured.
+ int err = 0;
+ socklen_t len = sizeof err;
+ int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
+ zmq_assert (rc == 0);
+ if (err != 0) {
+ errno = err;
+ return retired_fd;
+ }
+
+ // Return the newly connected socket.
+ fd_t result = s;
+ s = retired_fd;
+ return result;
+}
#else
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 22d47ca..9431ccf 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -27,7 +27,94 @@
#ifdef ZMQ_HAVE_WINDOWS
-#error
+zmq::tcp_listener_t::tcp_listener_t () :
+ s (retired_fd)
+{
+ memset (&addr, 0, sizeof (addr));
+}
+
+zmq::tcp_listener_t::~tcp_listener_t ()
+{
+ if (s != retired_fd)
+ close ();
+}
+
+int zmq::tcp_listener_t::set_address (const char *addr_)
+{
+ // Convert the interface into sockaddr_in structure.
+ int rc = resolve_ip_interface (&addr, addr_);
+ if (rc != 0)
+ return rc;
+
+ // Create a listening socket.
+ s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ // TODO: Convert error code to errno.
+ wsa_assert (s != INVALID_SOCKET);
+
+ // Allow reusing of the address.
+ int flag = 1;
+ rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR,
+ (const char*) &flag, sizeof (int));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Set the non-blocking flag.
+ flag = 1;
+ rc = ioctlsocket (s, FIONBIO, (u_long*) &flag);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Bind the socket to the network interface and port.
+ rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
+ // TODO: Convert error code to errno.
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Listen for incomming connections.
+ rc = listen (s, 1);
+ // TODO: Convert error code to errno.
+ wsa_assert (rc != SOCKET_ERROR);
+
+ return 0;
+}
+
+int zmq::tcp_listener_t::close ()
+{
+ zmq_assert (s != retired_fd);
+ int rc = closesocket (s);
+ wsa_assert (rc != SOCKET_ERROR);
+ s = retired_fd;
+ return 0;
+}
+
+zmq::fd_t zmq::tcp_listener_t::get_fd ()
+{
+ return s;
+}
+
+zmq::fd_t zmq::tcp_listener_t::accept ()
+{
+ zmq_assert (s != retired_fd);
+
+ // Accept one incoming connection.
+ fd_t sock = ::accept (s, NULL, NULL);
+ if (sock == INVALID_SOCKET &&
+ (WSAGetLastError () == WSAEWOULDBLOCK ||
+ WSAGetLastError () == WSAECONNRESET))
+ return retired_fd;
+
+ zmq_assert (sock != INVALID_SOCKET);
+
+ // Set to non-blocking mode.
+ unsigned long argp = 1;
+ int rc = ioctlsocket (sock, FIONBIO, &argp);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Disable Nagle's algorithm.
+ int flag = 1;
+ rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
+ sizeof (int));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ return sock;
+}
#else
diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp
index 21f60f9..782f0d8 100644
--- a/src/tcp_socket.cpp
+++ b/src/tcp_socket.cpp
@@ -50,6 +50,11 @@ int zmq::tcp_socket_t::close ()
return 0;
}
+zmq::fd_t zmq::tcp_socket_t::get_fd ()
+{
+ return s;
+}
+
int zmq::tcp_socket_t::write (const void *data, int size)
{
int nbytes = send (s, (char*) data, size, 0);
diff --git a/src/zmq.cpp b/src/zmq.cpp
index ad4839f..63a7b4b 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -27,6 +27,13 @@
#include "err.hpp"
#include "dispatcher.hpp"
#include "msg_content.hpp"
+#include "platform.hpp"
+#include "stdint.hpp"
+
+#if !defined ZMQ_HAVE_WINDOWS
+#include <unistd.h>
+#include <sys/time.h>
+#endif
int zmq_msg_init (zmq_msg_t *msg_)
{
@@ -39,7 +46,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
if (size_ <= ZMQ_MAX_VSM_SIZE) {
msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->vsm_size = (uint16_t) size_;
+ msg_->vsm_size = (uint8_t) size_;
}
else {
msg_->content =
@@ -228,3 +235,61 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
{
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
+
+#if defined ZMQ_HAVE_WINDOWS
+
+static uint64_t now ()
+{
+ // Get the high resolution counter's accuracy.
+ LARGE_INTEGER ticksPerSecond;
+ QueryPerformanceFrequency (&ticksPerSecond);
+
+ // What time is it?
+ LARGE_INTEGER tick;
+ QueryPerformanceCounter (&tick);
+
+ // Convert the tick number into the number of seconds
+ // since the system was started.
+ double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000);
+ return (uint64_t) (tick.QuadPart / ticks_div);
+}
+
+void zmq_sleep (int seconds_)
+{
+ Sleep (seconds_ * 1000);
+}
+
+#else
+
+static uint64_t now ()
+{
+ struct timeval tv;
+ int rc;
+
+ rc = gettimeofday (&tv, NULL);
+ assert (rc == 0);
+ return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
+}
+
+void zmq_sleep (int seconds_)
+{
+ sleep (seconds_);
+}
+
+#endif
+
+void *zmq_stopwatch_start ()
+{
+ uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
+ zmq_assert (watch);
+ *watch = now ();
+ return (void*) watch;
+}
+
+unsigned long zmq_stopwatch_stop (void *watch_)
+{
+ uint64_t end = now ();
+ uint64_t start = *(uint64_t*) watch_;
+ free (watch_);
+ return (unsigned long) (end - start);
+} \ No newline at end of file