summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/xs_utils.h12
-rw-r--r--src/xpub.cpp2
-rw-r--r--src/xs_utils.cpp43
-rw-r--r--tests/linger.cpp11
-rw-r--r--tests/reconnect.cpp13
-rw-r--r--tests/shutdown_stress.cpp32
-rw-r--r--tests/testutil.hpp1
-rw-r--r--tests/timeo.cpp24
8 files changed, 70 insertions, 68 deletions
diff --git a/include/xs_utils.h b/include/xs_utils.h
index c1171fe..3348aa3 100644
--- a/include/xs_utils.h
+++ b/include/xs_utils.h
@@ -43,17 +43,23 @@ extern "C" {
#endif
/* Helper functions are used by perf tests so that they don't have to care */
-/* about minutiae of time-related functions on different OS platforms. */
+/* about minutiae of different OS platforms. */
/* Starts the stopwatch. Returns the handle to the watch. */
XS_EXPORT void *xs_stopwatch_start (void);
/* Stops the stopwatch. Returns the number of microseconds elapsed since */
/* the stopwatch was started. */
-XS_EXPORT unsigned long xs_stopwatch_stop (void *watch_);
+XS_EXPORT unsigned long xs_stopwatch_stop (void *watch);
/* Sleeps for specified number of seconds. */
-XS_EXPORT void xs_sleep (int seconds_);
+XS_EXPORT void xs_sleep (int seconds);
+
+/* Creates a new thread. */
+XS_EXPORT void *xs_thread_create (void (*fn) (void *arg), void* arg);
+
+/* Wait for thread to finish. */
+XS_EXPORT void xs_thread_join (void *thread);
#undef XS_EXPORT
diff --git a/src/xpub.cpp b/src/xpub.cpp
index b4bc135..d27337e 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -177,7 +177,7 @@ void xs::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
xpub_t *self = (xpub_t*) arg_;
blob_t unsub (size_ + 1, 0);
unsub [0] = 0;
- memcpy (&unsub [1], data_, size_);
+ memcpy ((void*) (unsub.data () + 1), data_, size_);
self->pending.push_back (unsub);
}
}
diff --git a/src/xs_utils.cpp b/src/xs_utils.cpp
index e30ffe2..976f540 100644
--- a/src/xs_utils.cpp
+++ b/src/xs_utils.cpp
@@ -59,3 +59,46 @@ unsigned long xs_stopwatch_stop (void *watch_)
free (watch_);
return (unsigned long) (end - start);
}
+
+#if defined XS_HAVE_WINDOWS
+
+struct arg_t
+{
+ HANDLE handle;
+ void (*fn) (void *arg);
+ void *arg;
+};
+
+extern "C"
+{
+ static unsigned int __stdcall thread_routine (void *arg_)
+ {
+ arg_t *arg = (arg_t*) arg_;
+ arg->fn (arg->arg);
+ return 0;
+ }
+}
+
+void *xs_thread_create (void (*fn_) (void *arg_), void *arg_)
+{
+ arg_t *arg = (arg_t*) malloc (sizeof (arg_t));
+ alloc_assert (arg);
+ arg->fn = fn_;
+ arg->arg = arg_;
+ arg->handle = (HANDLE) _beginthreadex (NULL, 0,
+ &::thread_routine, (void*) arg, 0 , NULL);
+ win_assert (arg->handle != NULL);
+ return (void*) arg;
+}
+
+void xs_thread_join (void *thread_)
+{
+ arg_t *arg = (arg_t*) thread_;
+ DWORD rc = WaitForSingleObject (arg->handle, INFINITE);
+ win_assert (rc != WAIT_FAILED);
+ BOOL rc2 = CloseHandle (arg->handle);
+ win_assert (rc2 != 0);
+ free (arg);
+}
+
+#endif
diff --git a/tests/linger.cpp b/tests/linger.cpp
index f76be39..5c0480c 100644
--- a/tests/linger.cpp
+++ b/tests/linger.cpp
@@ -20,13 +20,6 @@
#include "testutil.hpp"
-#if defined XS_HAVE_WINDOWS
-int XS_TEST_MAIN ()
-{
- return 0;
-}
-#else
-
int XS_TEST_MAIN ()
{
fprintf (stderr, "linger test running...\n");
@@ -43,7 +36,7 @@ int XS_TEST_MAIN ()
// Connect to non-existent endpoing.
assert (rc == 0);
- rc = xs_connect (s, "ipc:///tmp/this-file-does-not-exist");
+ rc = xs_connect (s, "tcp://127.0.0.1:5560");
assert (rc == 0);
// Send a message.
@@ -63,5 +56,3 @@ int XS_TEST_MAIN ()
return 0;
}
-
-#endif \ No newline at end of file
diff --git a/tests/reconnect.cpp b/tests/reconnect.cpp
index 09eefd4..0e38f08 100644
--- a/tests/reconnect.cpp
+++ b/tests/reconnect.cpp
@@ -20,13 +20,6 @@
#include "testutil.hpp"
-#if defined XS_HAVE_WINDOWS
-int XS_TEST_MAIN ()
-{
- return 0;
-}
-#else
-
int XS_TEST_MAIN ()
{
fprintf (stderr, "reconnect test running...\n");
@@ -68,7 +61,7 @@ int XS_TEST_MAIN ()
assert (push);
// Connect before bind was done at the peer and send one message.
- rc = xs_connect (push, "ipc:///tmp/tester");
+ rc = xs_connect (push, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = xs_send (push, "ABC", 3, 0);
assert (rc == 3);
@@ -77,7 +70,7 @@ int XS_TEST_MAIN ()
xs_sleep (1);
// Bind the peer and get the message.
- rc = xs_bind (pull, "ipc:///tmp/tester");
+ rc = xs_bind (pull, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = xs_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
@@ -92,5 +85,3 @@ int XS_TEST_MAIN ()
return 0 ;
}
-
-#endif \ No newline at end of file
diff --git a/tests/shutdown_stress.cpp b/tests/shutdown_stress.cpp
index 2e6b4d5..976c658 100644
--- a/tests/shutdown_stress.cpp
+++ b/tests/shutdown_stress.cpp
@@ -21,32 +21,20 @@
#include "testutil.hpp"
-#if defined XS_HAVE_WINDOWS
-int XS_TEST_MAIN ()
-{
- return 0;
-}
-#else
-
-#include <pthread.h>
-#include <stddef.h>
-
#define THREAD_COUNT 100
extern "C"
{
- static void *worker (void *s)
+ static void shutdown_stress_worker (void *s_)
{
int rc;
- rc = xs_connect (s, "tcp://127.0.0.1:5560");
+ rc = xs_connect (s_, "tcp://127.0.0.1:5560");
assert (rc == 0);
// Start closing the socket while the connecting process is underway.
- rc = xs_close (s);
+ rc = xs_close (s_);
assert (rc == 0);
-
- return NULL;
}
}
@@ -58,7 +46,7 @@ int XS_TEST_MAIN ()
int i;
int j;
int rc;
- pthread_t threads [THREAD_COUNT];
+ void *threads [THREAD_COUNT];
fprintf (stderr, "shutdown_stress test running...\n");
@@ -77,14 +65,12 @@ int XS_TEST_MAIN ()
for (i = 0; i != THREAD_COUNT; i++) {
s2 = xs_socket (ctx, XS_SUB);
assert (s2);
- rc = pthread_create (&threads [i], NULL, worker, s2);
- assert (rc == 0);
+ threads [i] = xs_thread_create (shutdown_stress_worker, s2);
+ assert (threads [i]);
}
- for (i = 0; i != THREAD_COUNT; i++) {
- rc = pthread_join (threads [i], NULL);
- assert (rc == 0);
- }
+ for (i = 0; i != THREAD_COUNT; i++)
+ xs_thread_join (threads [i]);
rc = xs_close (s1);
assert (rc == 0);
@@ -95,5 +81,3 @@ int XS_TEST_MAIN ()
return 0;
}
-
-#endif \ No newline at end of file
diff --git a/tests/testutil.hpp b/tests/testutil.hpp
index ba4cf5f..444ff60 100644
--- a/tests/testutil.hpp
+++ b/tests/testutil.hpp
@@ -25,6 +25,7 @@
#include <assert.h>
#include <string.h>
#include <stdio.h>
+#include <stddef.h>
#include "../include/xs.h"
#include "../include/xs_utils.h"
diff --git a/tests/timeo.cpp b/tests/timeo.cpp
index 3cb09f1..ade5150 100644
--- a/tests/timeo.cpp
+++ b/tests/timeo.cpp
@@ -20,30 +20,20 @@
#include "testutil.hpp"
-#if defined XS_HAVE_WINDOWS
-int XS_TEST_MAIN ()
-{
- return 0;
-}
-#else
-
-#include <pthread.h>
-
extern "C"
{
- void *worker(void *ctx)
+ void timeo_worker(void *ctx_)
{
// Worker thread connects after delay of 1 second. Then it waits
// for 1 more second, so that async connect has time to succeed.
xs_sleep (1);
- void *sc = xs_socket (ctx, XS_PUSH);
+ void *sc = xs_socket (ctx_, XS_PUSH);
assert (sc);
int rc = xs_connect (sc, "inproc://timeout_test");
assert (rc == 0);
xs_sleep (1);
rc = xs_close (sc);
assert (rc == 0);
- return NULL;
}
}
@@ -82,17 +72,15 @@ int XS_TEST_MAIN ()
timeout = 2000;
rc = xs_setsockopt(sb, XS_RCVTIMEO, &timeout, timeout_size);
assert (rc == 0);
- pthread_t thread;
- rc = pthread_create (&thread, NULL, worker, ctx);
- assert (rc == 0);
+ void *thread = xs_thread_create (timeo_worker, ctx);
+ assert (thread);
watch = xs_stopwatch_start ();
rc = xs_recv (sb, buf, 32, 0);
assert (rc == -1);
assert (xs_errno () == EAGAIN);
elapsed = xs_stopwatch_stop (watch);
assert (elapsed > 1900000 && elapsed < 2100000);
- rc = pthread_join (thread, NULL);
- assert (rc == 0);
+ xs_thread_join (thread);
// Check that timeouts don't break normal message transfer.
void *sc = xs_socket (ctx, XS_PUSH);
@@ -118,5 +106,3 @@ int XS_TEST_MAIN ()
return 0 ;
}
-
-#endif \ No newline at end of file