From ec6822a477b89ac77afc90425bf36c4829dbef3d Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 8 Sep 2009 11:30:49 +0200 Subject: win port for c and cpp perf tests --- src/fd_signaler.cpp | 2 +- src/tcp_connecter.cpp | 18 ++++++++++- src/tcp_listener.cpp | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/tcp_socket.cpp | 5 +++ src/zmq.cpp | 67 +++++++++++++++++++++++++++++++++++++- 5 files changed, 177 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 3f433b8..862b0fd 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -151,7 +151,7 @@ zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () signals_t signals = 0; for (int pos = 0; pos != nbytes; pos++) { zmq_assert (buffer [pos] < 64); - signals |= (1 << (buffer [pos])); + signals |= (signals_t (1) << (buffer [pos])); } return signals; } diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index fa99538..22caafb 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -98,7 +98,23 @@ zmq::fd_t zmq::tcp_connecter_t::get_fd () return s; } -// connect +zmq::fd_t zmq::tcp_connecter_t::connect () +{ + // Nonblocking connect have finished. Check whether an error occured. + int err = 0; + socklen_t len = sizeof err; + int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); + zmq_assert (rc == 0); + if (err != 0) { + errno = err; + return retired_fd; + } + + // Return the newly connected socket. + fd_t result = s; + s = retired_fd; + return result; +} #else diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 22d47ca..9431ccf 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -27,7 +27,94 @@ #ifdef ZMQ_HAVE_WINDOWS -#error +zmq::tcp_listener_t::tcp_listener_t () : + s (retired_fd) +{ + memset (&addr, 0, sizeof (addr)); +} + +zmq::tcp_listener_t::~tcp_listener_t () +{ + if (s != retired_fd) + close (); +} + +int zmq::tcp_listener_t::set_address (const char *addr_) +{ + // Convert the interface into sockaddr_in structure. + int rc = resolve_ip_interface (&addr, addr_); + if (rc != 0) + return rc; + + // Create a listening socket. + s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + // TODO: Convert error code to errno. + wsa_assert (s != INVALID_SOCKET); + + // Allow reusing of the address. + int flag = 1; + rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, + (const char*) &flag, sizeof (int)); + wsa_assert (rc != SOCKET_ERROR); + + // Set the non-blocking flag. + flag = 1; + rc = ioctlsocket (s, FIONBIO, (u_long*) &flag); + wsa_assert (rc != SOCKET_ERROR); + + // Bind the socket to the network interface and port. + rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); + // TODO: Convert error code to errno. + wsa_assert (rc != SOCKET_ERROR); + + // Listen for incomming connections. + rc = listen (s, 1); + // TODO: Convert error code to errno. + wsa_assert (rc != SOCKET_ERROR); + + return 0; +} + +int zmq::tcp_listener_t::close () +{ + zmq_assert (s != retired_fd); + int rc = closesocket (s); + wsa_assert (rc != SOCKET_ERROR); + s = retired_fd; + return 0; +} + +zmq::fd_t zmq::tcp_listener_t::get_fd () +{ + return s; +} + +zmq::fd_t zmq::tcp_listener_t::accept () +{ + zmq_assert (s != retired_fd); + + // Accept one incoming connection. + fd_t sock = ::accept (s, NULL, NULL); + if (sock == INVALID_SOCKET && + (WSAGetLastError () == WSAEWOULDBLOCK || + WSAGetLastError () == WSAECONNRESET)) + return retired_fd; + + zmq_assert (sock != INVALID_SOCKET); + + // Set to non-blocking mode. + unsigned long argp = 1; + int rc = ioctlsocket (sock, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + + // Disable Nagle's algorithm. + int flag = 1; + rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, + sizeof (int)); + wsa_assert (rc != SOCKET_ERROR); + + return sock; +} #else diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index 21f60f9..782f0d8 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -50,6 +50,11 @@ int zmq::tcp_socket_t::close () return 0; } +zmq::fd_t zmq::tcp_socket_t::get_fd () +{ + return s; +} + int zmq::tcp_socket_t::write (const void *data, int size) { int nbytes = send (s, (char*) data, size, 0); diff --git a/src/zmq.cpp b/src/zmq.cpp index ad4839f..63a7b4b 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -27,6 +27,13 @@ #include "err.hpp" #include "dispatcher.hpp" #include "msg_content.hpp" +#include "platform.hpp" +#include "stdint.hpp" + +#if !defined ZMQ_HAVE_WINDOWS +#include +#include +#endif int zmq_msg_init (zmq_msg_t *msg_) { @@ -39,7 +46,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) { if (size_ <= ZMQ_MAX_VSM_SIZE) { msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->vsm_size = (uint16_t) size_; + msg_->vsm_size = (uint8_t) size_; } else { msg_->content = @@ -228,3 +235,61 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) { return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } + +#if defined ZMQ_HAVE_WINDOWS + +static uint64_t now () +{ + // Get the high resolution counter's accuracy. + LARGE_INTEGER ticksPerSecond; + QueryPerformanceFrequency (&ticksPerSecond); + + // What time is it? + LARGE_INTEGER tick; + QueryPerformanceCounter (&tick); + + // Convert the tick number into the number of seconds + // since the system was started. + double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000); + return (uint64_t) (tick.QuadPart / ticks_div); +} + +void zmq_sleep (int seconds_) +{ + Sleep (seconds_ * 1000); +} + +#else + +static uint64_t now () +{ + struct timeval tv; + int rc; + + rc = gettimeofday (&tv, NULL); + assert (rc == 0); + return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec); +} + +void zmq_sleep (int seconds_) +{ + sleep (seconds_); +} + +#endif + +void *zmq_stopwatch_start () +{ + uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t)); + zmq_assert (watch); + *watch = now (); + return (void*) watch; +} + +unsigned long zmq_stopwatch_stop (void *watch_) +{ + uint64_t end = now (); + uint64_t start = *(uint64_t*) watch_; + free (watch_); + return (unsigned long) (end - start); +} \ No newline at end of file -- cgit v1.2.3