diff options
-rw-r--r-- | zmq-camera.c | 294 |
1 files changed, 180 insertions, 114 deletions
diff --git a/zmq-camera.c b/zmq-camera.c index fb46899..007df9b 100644 --- a/zmq-camera.c +++ b/zmq-camera.c @@ -40,6 +40,13 @@ /* inproc endpoint used to provide the "local loopback" view of the camera */ const char local_camera[] = "inproc://local-camera"; +/* inproc endpoint used for signalling the main thread */ +const char local_signal[] = "inproc://local-signal"; + +/* + * Sender thread + */ + /* Sender thread initialiser */ struct sender_args_t { @@ -48,10 +55,6 @@ struct sender_args_t int frame_rate; }; -/* - * Sender thread - */ - void *sender_thread (void *arg) { struct sender_args_t *sender_args; @@ -214,13 +217,32 @@ void *sender_thread (void *arg) * Signal handling thread */ -/* Global variable used to signal if an alarm has expired */ -volatile int alarm_expired = 0; +/* Signal thread initialiser */ +struct signal_args_t +{ + void *ctx; +}; void *signal_thread (void *arg) { sigset_t wait_set; int rc, signo; + struct signal_args_t *signal_args; + void *s; + zmq_msg_t msg; + + signal_args = (struct signal_args_t *)arg; + + /* Create a ZMQ_PUB socket for publising signal events back to the + main thread */ + s = zmq_socket (signal_args->ctx, ZMQ_PUB); + assert (s); + rc = zmq_connect (s, local_signal); + if (rc != 0) { + fprintf (stderr, "zmq_connect (\"%s\"): %s\n", local_signal, + zmq_strerror (errno)); + exit (1); + } sigemptyset (&wait_set); sigaddset (&wait_set, SIGTERM); @@ -231,19 +253,12 @@ void *signal_thread (void *arg) while (1) { rc = sigwait (&wait_set, &signo); assert (rc == 0); - - switch (signo) { - case SIGTERM: - case SIGINT: - case SIGQUIT: - fprintf (stderr, "Exiting.\n"); - exit (1); - case SIGALRM: - alarm_expired = 1; - break; - default: - ; /* Ignored */ - } + rc = zmq_msg_init_size (&msg, sizeof (int)); + assert (rc == 0); + memcpy (zmq_msg_data (&msg), &signo, sizeof (int)); + rc = zmq_send (s, &msg, 0); + assert (rc == 0); + zmq_msg_close (&msg); } } @@ -281,7 +296,9 @@ void err_usage(char *error) /* Main program. */ int main (int argc, char *argv []) { - void *ctx, *s; + void *ctx, + *s_sub, + *s_signal; int rc; int opt, opt_sender, opt_receiver, opt_frame_rate, frame_rate; char *endpoint = NULL; @@ -294,6 +311,7 @@ int main (int argc, char *argv []) struct itimerval alarm_timer; uint64_t received_fps = 0; uint64_t received_bps = 0; + double received_mbps; char window_title[80]; /* Parse command line. */ @@ -325,26 +343,34 @@ int main (int argc, char *argv []) err_usage ("Please specify either the -r or -s option"); endpoint = argv [optind]; - /* Block signal handling in all application and 0MQ threads and - start a separate thread to handle all signals synchronously */ + /* Block signal handling in all application and 0MQ threads */ sigfillset (&block_set); pthread_sigmask (SIG_SETMASK, &block_set, NULL); - pthread_t signal; - rc = pthread_create (&signal, NULL, signal_thread, NULL); - assert (rc == 0); - /* Initialise 0MQ infrastructure for 2 application threads and + /* Initialise 0MQ infrastructure for 3 application threads and a single I/O thread */ - ctx = zmq_init (2, 1, 0); + ctx = zmq_init (3, 1, ZMQ_POLL); assert (ctx); + + /* Create a ZMQ_SUB socket to receive signal events and start the + signal handling thread */ + s_signal = zmq_socket (ctx, ZMQ_SUB); + assert (s_signal); + rc = zmq_bind (s_signal, local_signal); + assert (rc == 0); + + pthread_t signal; + struct signal_args_t signal_args = {ctx}; + rc = pthread_create (&signal, NULL, signal_thread, (void *) &signal_args); + assert (rc == 0); /* Create a ZMQ_SUB socket to receive video data. If we're sending video, bind to an inproc: endpoint that the sender thread will connect to, otherwise connect to the endpoint the user specified. */ - s = zmq_socket (ctx, ZMQ_SUB); - assert (s); + s_sub = zmq_socket (ctx, ZMQ_SUB); + assert (s_sub); if (opt_sender) { - rc = zmq_bind (s, local_camera); + rc = zmq_bind (s_sub, local_camera); if (rc != 0) { fprintf (stderr, "zmq_bind (\"%s\"): %s\n", local_camera, zmq_strerror (errno)); @@ -360,7 +386,7 @@ int main (int argc, char *argv []) assert (rc == 0); } else { - rc = zmq_connect (s, endpoint); + rc = zmq_connect (s_sub, endpoint); if (rc != 0) { fprintf (stderr, "zmq_connect (\"%s\"): %s\n", endpoint, zmq_strerror (errno)); @@ -368,8 +394,10 @@ int main (int argc, char *argv []) } } - /* Subscribe to all messages on socket. */ - rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "", 0); + /* Subscribe to all messages on video and signal sockets. */ + rc = zmq_setsockopt (s_sub, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_setsockopt (s_signal, ZMQ_SUBSCRIBE, "", 0); assert (rc == 0); /* Set up an interval timer to fire SIGALRM once a second so that @@ -384,6 +412,13 @@ int main (int argc, char *argv []) /* Intial window title is the endpoint we are receiving from */ snprintf (window_title, 80, "%s", opt_sender ? local_camera : endpoint); + /* Set up pollset */ + zmq_pollitem_t items[2]; + items[0].socket = s_sub; + items[0].events = ZMQ_POLLIN; + items[1].socket = s_signal; + items[1].events = ZMQ_POLLIN; + /* Display video until user asks to quit. */ while (!quit) { @@ -392,95 +427,126 @@ int main (int argc, char *argv []) unsigned char *data; SDL_Event event; - /* Receive single message. */ - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_recv (s, &msg, 0); - assert (rc == 0); - - /* Parse message data. */ - data = (unsigned char*) zmq_msg_data (&msg); - /* Sanity check that we have at least the width, height in - the message data */ - msg_size = zmq_msg_size (&msg); - assert (msg_size >= sizeof (uint32_t) + sizeof (uint32_t)); - - /* Get image width in pixels. */ - memcpy (&image_width, data, sizeof (uint32_t)); - image_width = ntohl (image_width); - data += sizeof (uint32_t); - - /* Get image height in pixels. */ - memcpy (&image_height, data, sizeof (uint32_t)); - image_height = ntohl (image_height); - data += sizeof (uint32_t); - - /* Sanity check message size again */ - assert (msg_size >= image_width * image_height * 3); - - /* data now points to RGB24 pixel data. */ - - if (!sdl_initialised) { - - /* Initialise SDL if not already done. - We need to have received at least one message, so that we - know what the image size being sent is. */ - if (SDL_Init (SDL_INIT_VIDEO) < 0) - { - fprintf (stderr, "Failed to initialize SDL: %s\n", - SDL_GetError()); - exit (1); + /* Poll for incoming messages on either socket */ + rc = zmq_poll (items, 2, 100000); + assert (rc >= 0); + + /* Signal received */ + if (items[1].revents == ZMQ_POLLIN) { + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_recv (s_signal, &msg, 0); + assert (rc == 0); + assert (zmq_msg_size (&msg) == sizeof (int)); + int signo; + memcpy (&signo, zmq_msg_data (&msg), sizeof (int)); + + switch (signo) { + case SIGTERM: + case SIGINT: + case SIGQUIT: + fprintf (stderr, "Exiting.\n"); + exit (1); + case SIGALRM: + /* If the timer fired, recalculate bandwidth and update window title */ + received_mbps = (double)received_bps * 8 / (1<<20); + snprintf (window_title, 80, "%s (%llu fps, %.2lf Mbps)", + opt_sender ? local_camera : endpoint, + (long long unsigned int) received_fps, received_mbps); + if (sdl_initialised) + SDL_WM_SetCaption (window_title, window_title); + received_fps = received_bps = 0; + break; + default: + ; /* Ignore anything else */ } - screen = SDL_SetVideoMode (image_width, image_height, 32, - SDL_HWSURFACE); - if (screen == NULL) { - fprintf (stderr, "Unable to set video mode: %s\n", - SDL_GetError ()); - SDL_Quit (); - exit (1); - } - /* Set initial window title */ - SDL_WM_SetCaption (window_title, window_title); - sdl_initialised = 1; + zmq_msg_close (&msg); } - /* Create RGB surface. */ - rgb_surface = SDL_CreateRGBSurfaceFrom ( - data, /* Pixel data */ - image_width, /* Width */ - image_height, /* Height */ - 24, /* Depth */ - image_width * 3, /* Scanline pitch */ - 0, 0, 0, 0); /* TODO: RGBA mask */ - - /* Blit surface to screen. */ - SDL_BlitSurface (rgb_surface, NULL, screen, NULL); - SDL_UpdateRect (screen, 0, 0, 0, 0); - SDL_FreeSurface (rgb_surface); - - /* Free zmq_msg we received */ - zmq_msg_close (&msg); - - /* Update statistics */ - received_fps += 1; - received_bps += msg_size; - /* If the timer fired, recalculate bandwidth and update window title */ - if (alarm_expired) { - double received_mbps = (double)received_bps * 8 / (1<<20); - snprintf (window_title, 80, "%s (%llu fps, %.2lf Mbps)", - opt_sender ? local_camera : endpoint, - (long long unsigned int) received_fps, received_mbps); - SDL_WM_SetCaption (window_title, window_title); - received_fps = received_bps = 0; - alarm_expired = 0; + /* Video frame received */ + if (items[0].revents == ZMQ_POLLIN) { + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_recv (s_sub, &msg, 0); + assert (rc == 0); + + /* Parse message data. */ + data = (unsigned char*) zmq_msg_data (&msg); + /* Sanity check that we have at least the width, height in + the message data */ + msg_size = zmq_msg_size (&msg); + assert (msg_size >= sizeof (uint32_t) + sizeof (uint32_t)); + + /* Get image width in pixels. */ + memcpy (&image_width, data, sizeof (uint32_t)); + image_width = ntohl (image_width); + data += sizeof (uint32_t); + + /* Get image height in pixels. */ + memcpy (&image_height, data, sizeof (uint32_t)); + image_height = ntohl (image_height); + data += sizeof (uint32_t); + + /* Sanity check message size again */ + assert (msg_size >= image_width * image_height * 3); + + /* data now points to RGB24 pixel data. */ + + if (!sdl_initialised) { + + /* Initialise SDL if not already done. + We need to have received at least one message, so that we + know what the image size being sent is. */ + if (SDL_Init (SDL_INIT_VIDEO) < 0) + { + fprintf (stderr, "Failed to initialize SDL: %s\n", + SDL_GetError()); + exit (1); + } + screen = SDL_SetVideoMode (image_width, image_height, 32, + SDL_HWSURFACE); + if (screen == NULL) { + fprintf (stderr, "Unable to set video mode: %s\n", + SDL_GetError ()); + SDL_Quit (); + exit (1); + } + /* Set initial window title */ + SDL_WM_SetCaption (window_title, window_title); + + sdl_initialised = 1; + } + + /* Create RGB surface. */ + rgb_surface = SDL_CreateRGBSurfaceFrom ( + data, /* Pixel data */ + image_width, /* Width */ + image_height, /* Height */ + 24, /* Depth */ + image_width * 3, /* Scanline pitch */ + 0, 0, 0, 0); /* TODO: RGBA mask */ + + /* Blit surface to screen. */ + SDL_BlitSurface (rgb_surface, NULL, screen, NULL); + SDL_UpdateRect (screen, 0, 0, 0, 0); + SDL_FreeSurface (rgb_surface); + + /* Free zmq_msg we received */ + zmq_msg_close (&msg); + + /* Update statistics */ + received_fps += 1; + received_bps += msg_size; } /* Check if user asked to quit. */ + if (sdl_initialised) { while (SDL_PollEvent (&event)) - { - if (event.type == SDL_QUIT) - quit = 1; + { + if (event.type == SDL_QUIT) + quit = 1; + } } } |