summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--zmq-camera.c294
1 files changed, 180 insertions, 114 deletions
diff --git a/zmq-camera.c b/zmq-camera.c
index fb46899..007df9b 100644
--- a/zmq-camera.c
+++ b/zmq-camera.c
@@ -40,6 +40,13 @@
/* inproc endpoint used to provide the "local loopback" view of the camera */
const char local_camera[] = "inproc://local-camera";
+/* inproc endpoint used for signalling the main thread */
+const char local_signal[] = "inproc://local-signal";
+
+/*
+ * Sender thread
+ */
+
/* Sender thread initialiser */
struct sender_args_t
{
@@ -48,10 +55,6 @@ struct sender_args_t
int frame_rate;
};
-/*
- * Sender thread
- */
-
void *sender_thread (void *arg)
{
struct sender_args_t *sender_args;
@@ -214,13 +217,32 @@ void *sender_thread (void *arg)
* Signal handling thread
*/
-/* Global variable used to signal if an alarm has expired */
-volatile int alarm_expired = 0;
+/* Signal thread initialiser */
+struct signal_args_t
+{
+ void *ctx;
+};
void *signal_thread (void *arg)
{
sigset_t wait_set;
int rc, signo;
+ struct signal_args_t *signal_args;
+ void *s;
+ zmq_msg_t msg;
+
+ signal_args = (struct signal_args_t *)arg;
+
+ /* Create a ZMQ_PUB socket for publising signal events back to the
+ main thread */
+ s = zmq_socket (signal_args->ctx, ZMQ_PUB);
+ assert (s);
+ rc = zmq_connect (s, local_signal);
+ if (rc != 0) {
+ fprintf (stderr, "zmq_connect (\"%s\"): %s\n", local_signal,
+ zmq_strerror (errno));
+ exit (1);
+ }
sigemptyset (&wait_set);
sigaddset (&wait_set, SIGTERM);
@@ -231,19 +253,12 @@ void *signal_thread (void *arg)
while (1) {
rc = sigwait (&wait_set, &signo);
assert (rc == 0);
-
- switch (signo) {
- case SIGTERM:
- case SIGINT:
- case SIGQUIT:
- fprintf (stderr, "Exiting.\n");
- exit (1);
- case SIGALRM:
- alarm_expired = 1;
- break;
- default:
- ; /* Ignored */
- }
+ rc = zmq_msg_init_size (&msg, sizeof (int));
+ assert (rc == 0);
+ memcpy (zmq_msg_data (&msg), &signo, sizeof (int));
+ rc = zmq_send (s, &msg, 0);
+ assert (rc == 0);
+ zmq_msg_close (&msg);
}
}
@@ -281,7 +296,9 @@ void err_usage(char *error)
/* Main program. */
int main (int argc, char *argv [])
{
- void *ctx, *s;
+ void *ctx,
+ *s_sub,
+ *s_signal;
int rc;
int opt, opt_sender, opt_receiver, opt_frame_rate, frame_rate;
char *endpoint = NULL;
@@ -294,6 +311,7 @@ int main (int argc, char *argv [])
struct itimerval alarm_timer;
uint64_t received_fps = 0;
uint64_t received_bps = 0;
+ double received_mbps;
char window_title[80];
/* Parse command line. */
@@ -325,26 +343,34 @@ int main (int argc, char *argv [])
err_usage ("Please specify either the -r or -s option");
endpoint = argv [optind];
- /* Block signal handling in all application and 0MQ threads and
- start a separate thread to handle all signals synchronously */
+ /* Block signal handling in all application and 0MQ threads */
sigfillset (&block_set);
pthread_sigmask (SIG_SETMASK, &block_set, NULL);
- pthread_t signal;
- rc = pthread_create (&signal, NULL, signal_thread, NULL);
- assert (rc == 0);
- /* Initialise 0MQ infrastructure for 2 application threads and
+ /* Initialise 0MQ infrastructure for 3 application threads and
a single I/O thread */
- ctx = zmq_init (2, 1, 0);
+ ctx = zmq_init (3, 1, ZMQ_POLL);
assert (ctx);
+
+ /* Create a ZMQ_SUB socket to receive signal events and start the
+ signal handling thread */
+ s_signal = zmq_socket (ctx, ZMQ_SUB);
+ assert (s_signal);
+ rc = zmq_bind (s_signal, local_signal);
+ assert (rc == 0);
+
+ pthread_t signal;
+ struct signal_args_t signal_args = {ctx};
+ rc = pthread_create (&signal, NULL, signal_thread, (void *) &signal_args);
+ assert (rc == 0);
/* Create a ZMQ_SUB socket to receive video data. If we're sending video,
bind to an inproc: endpoint that the sender thread will connect to,
otherwise connect to the endpoint the user specified. */
- s = zmq_socket (ctx, ZMQ_SUB);
- assert (s);
+ s_sub = zmq_socket (ctx, ZMQ_SUB);
+ assert (s_sub);
if (opt_sender) {
- rc = zmq_bind (s, local_camera);
+ rc = zmq_bind (s_sub, local_camera);
if (rc != 0) {
fprintf (stderr, "zmq_bind (\"%s\"): %s\n", local_camera,
zmq_strerror (errno));
@@ -360,7 +386,7 @@ int main (int argc, char *argv [])
assert (rc == 0);
}
else {
- rc = zmq_connect (s, endpoint);
+ rc = zmq_connect (s_sub, endpoint);
if (rc != 0) {
fprintf (stderr, "zmq_connect (\"%s\"): %s\n", endpoint,
zmq_strerror (errno));
@@ -368,8 +394,10 @@ int main (int argc, char *argv [])
}
}
- /* Subscribe to all messages on socket. */
- rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "", 0);
+ /* Subscribe to all messages on video and signal sockets. */
+ rc = zmq_setsockopt (s_sub, ZMQ_SUBSCRIBE, "", 0);
+ assert (rc == 0);
+ rc = zmq_setsockopt (s_signal, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
/* Set up an interval timer to fire SIGALRM once a second so that
@@ -384,6 +412,13 @@ int main (int argc, char *argv [])
/* Intial window title is the endpoint we are receiving from */
snprintf (window_title, 80, "%s", opt_sender ? local_camera : endpoint);
+ /* Set up pollset */
+ zmq_pollitem_t items[2];
+ items[0].socket = s_sub;
+ items[0].events = ZMQ_POLLIN;
+ items[1].socket = s_signal;
+ items[1].events = ZMQ_POLLIN;
+
/* Display video until user asks to quit. */
while (!quit) {
@@ -392,95 +427,126 @@ int main (int argc, char *argv [])
unsigned char *data;
SDL_Event event;
- /* Receive single message. */
- rc = zmq_msg_init (&msg);
- assert (rc == 0);
- rc = zmq_recv (s, &msg, 0);
- assert (rc == 0);
-
- /* Parse message data. */
- data = (unsigned char*) zmq_msg_data (&msg);
- /* Sanity check that we have at least the width, height in
- the message data */
- msg_size = zmq_msg_size (&msg);
- assert (msg_size >= sizeof (uint32_t) + sizeof (uint32_t));
-
- /* Get image width in pixels. */
- memcpy (&image_width, data, sizeof (uint32_t));
- image_width = ntohl (image_width);
- data += sizeof (uint32_t);
-
- /* Get image height in pixels. */
- memcpy (&image_height, data, sizeof (uint32_t));
- image_height = ntohl (image_height);
- data += sizeof (uint32_t);
-
- /* Sanity check message size again */
- assert (msg_size >= image_width * image_height * 3);
-
- /* data now points to RGB24 pixel data. */
-
- if (!sdl_initialised) {
-
- /* Initialise SDL if not already done.
- We need to have received at least one message, so that we
- know what the image size being sent is. */
- if (SDL_Init (SDL_INIT_VIDEO) < 0)
- {
- fprintf (stderr, "Failed to initialize SDL: %s\n",
- SDL_GetError());
- exit (1);
+ /* Poll for incoming messages on either socket */
+ rc = zmq_poll (items, 2, 100000);
+ assert (rc >= 0);
+
+ /* Signal received */
+ if (items[1].revents == ZMQ_POLLIN) {
+ rc = zmq_msg_init (&msg);
+ assert (rc == 0);
+ rc = zmq_recv (s_signal, &msg, 0);
+ assert (rc == 0);
+ assert (zmq_msg_size (&msg) == sizeof (int));
+ int signo;
+ memcpy (&signo, zmq_msg_data (&msg), sizeof (int));
+
+ switch (signo) {
+ case SIGTERM:
+ case SIGINT:
+ case SIGQUIT:
+ fprintf (stderr, "Exiting.\n");
+ exit (1);
+ case SIGALRM:
+ /* If the timer fired, recalculate bandwidth and update window title */
+ received_mbps = (double)received_bps * 8 / (1<<20);
+ snprintf (window_title, 80, "%s (%llu fps, %.2lf Mbps)",
+ opt_sender ? local_camera : endpoint,
+ (long long unsigned int) received_fps, received_mbps);
+ if (sdl_initialised)
+ SDL_WM_SetCaption (window_title, window_title);
+ received_fps = received_bps = 0;
+ break;
+ default:
+ ; /* Ignore anything else */
}
- screen = SDL_SetVideoMode (image_width, image_height, 32,
- SDL_HWSURFACE);
- if (screen == NULL) {
- fprintf (stderr, "Unable to set video mode: %s\n",
- SDL_GetError ());
- SDL_Quit ();
- exit (1);
- }
- /* Set initial window title */
- SDL_WM_SetCaption (window_title, window_title);
- sdl_initialised = 1;
+ zmq_msg_close (&msg);
}
- /* Create RGB surface. */
- rgb_surface = SDL_CreateRGBSurfaceFrom (
- data, /* Pixel data */
- image_width, /* Width */
- image_height, /* Height */
- 24, /* Depth */
- image_width * 3, /* Scanline pitch */
- 0, 0, 0, 0); /* TODO: RGBA mask */
-
- /* Blit surface to screen. */
- SDL_BlitSurface (rgb_surface, NULL, screen, NULL);
- SDL_UpdateRect (screen, 0, 0, 0, 0);
- SDL_FreeSurface (rgb_surface);
-
- /* Free zmq_msg we received */
- zmq_msg_close (&msg);
-
- /* Update statistics */
- received_fps += 1;
- received_bps += msg_size;
- /* If the timer fired, recalculate bandwidth and update window title */
- if (alarm_expired) {
- double received_mbps = (double)received_bps * 8 / (1<<20);
- snprintf (window_title, 80, "%s (%llu fps, %.2lf Mbps)",
- opt_sender ? local_camera : endpoint,
- (long long unsigned int) received_fps, received_mbps);
- SDL_WM_SetCaption (window_title, window_title);
- received_fps = received_bps = 0;
- alarm_expired = 0;
+ /* Video frame received */
+ if (items[0].revents == ZMQ_POLLIN) {
+ rc = zmq_msg_init (&msg);
+ assert (rc == 0);
+ rc = zmq_recv (s_sub, &msg, 0);
+ assert (rc == 0);
+
+ /* Parse message data. */
+ data = (unsigned char*) zmq_msg_data (&msg);
+ /* Sanity check that we have at least the width, height in
+ the message data */
+ msg_size = zmq_msg_size (&msg);
+ assert (msg_size >= sizeof (uint32_t) + sizeof (uint32_t));
+
+ /* Get image width in pixels. */
+ memcpy (&image_width, data, sizeof (uint32_t));
+ image_width = ntohl (image_width);
+ data += sizeof (uint32_t);
+
+ /* Get image height in pixels. */
+ memcpy (&image_height, data, sizeof (uint32_t));
+ image_height = ntohl (image_height);
+ data += sizeof (uint32_t);
+
+ /* Sanity check message size again */
+ assert (msg_size >= image_width * image_height * 3);
+
+ /* data now points to RGB24 pixel data. */
+
+ if (!sdl_initialised) {
+
+ /* Initialise SDL if not already done.
+ We need to have received at least one message, so that we
+ know what the image size being sent is. */
+ if (SDL_Init (SDL_INIT_VIDEO) < 0)
+ {
+ fprintf (stderr, "Failed to initialize SDL: %s\n",
+ SDL_GetError());
+ exit (1);
+ }
+ screen = SDL_SetVideoMode (image_width, image_height, 32,
+ SDL_HWSURFACE);
+ if (screen == NULL) {
+ fprintf (stderr, "Unable to set video mode: %s\n",
+ SDL_GetError ());
+ SDL_Quit ();
+ exit (1);
+ }
+ /* Set initial window title */
+ SDL_WM_SetCaption (window_title, window_title);
+
+ sdl_initialised = 1;
+ }
+
+ /* Create RGB surface. */
+ rgb_surface = SDL_CreateRGBSurfaceFrom (
+ data, /* Pixel data */
+ image_width, /* Width */
+ image_height, /* Height */
+ 24, /* Depth */
+ image_width * 3, /* Scanline pitch */
+ 0, 0, 0, 0); /* TODO: RGBA mask */
+
+ /* Blit surface to screen. */
+ SDL_BlitSurface (rgb_surface, NULL, screen, NULL);
+ SDL_UpdateRect (screen, 0, 0, 0, 0);
+ SDL_FreeSurface (rgb_surface);
+
+ /* Free zmq_msg we received */
+ zmq_msg_close (&msg);
+
+ /* Update statistics */
+ received_fps += 1;
+ received_bps += msg_size;
}
/* Check if user asked to quit. */
+ if (sdl_initialised) {
while (SDL_PollEvent (&event))
- {
- if (event.type == SDL_QUIT)
- quit = 1;
+ {
+ if (event.type == SDL_QUIT)
+ quit = 1;
+ }
}
}