Fix handling of multiple FDs
authorJulien Desfossez <julien.desfossez@polymtl.ca>
Tue, 24 May 2011 15:31:14 +0000 (17:31 +0200)
committerDavid Goulet <david.goulet@polymtl.ca>
Tue, 24 May 2011 15:32:40 +0000 (11:32 -0400)
This patch fixes the (normal) case where we handle more than one reading
fd. Previous versions were only tested with one FD, as of now we can
consume multiple fd (metadata and data for example).

Signed-off-by: Julien Desfossez <julien.desfossez@polymtl.ca>
kconsumerd/kconsumerd.c

index 4c9c089aa9e5268da6575ca777b05f4d1ad5ec15..66f3877d020cddae531fd123fc7df4ab31757e18 100644 (file)
@@ -79,6 +79,7 @@ static char error_sock_path[PATH_MAX]; /* Global error path */
  */
 static void del_fd(struct ltt_kconsumerd_fd *lcf)
 {
+       DBG("Removing %d", lcf->consumerd_fd);
        pthread_mutex_lock(&kconsumerd_lock_fds);
        cds_list_del(&lcf->list);
        if (fds_count > 0) {
@@ -103,7 +104,6 @@ static void cleanup()
 {
        struct ltt_kconsumerd_fd *iter;
 
-
        /* remove the socket file */
        unlink(command_sock_path);
 
@@ -118,7 +118,8 @@ static void cleanup()
        }
 }
 
-/* send_error
+/*
+ * send_error
  *
  * send return code to ltt-sessiond
  */
@@ -330,7 +331,7 @@ static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd)
        long ret = 0;
        int infd = kconsumerd_fd->consumerd_fd;
 
-       DBG("In read_subbuffer");
+       DBG("In read_subbuffer (infd : %d)", infd);
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
@@ -518,23 +519,21 @@ static void *thread_receive_fds(void *data)
                goto error;
        }
        while (1) {
-
                /* We first get the number of fd we are about to receive */
                ret = lttcomm_recv_unix_sock(sock, &tmp,
                                sizeof(struct lttcomm_kconsumerd_header));
-               if (ret < 0) {
+               if (ret <= 0) {
                        ERR("Receiving the lttcomm_kconsumerd_header, exiting");
                        goto error;
                }
                ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
-               if (ret < 0) {
+               if (ret <= 0) {
                        ERR("Receiving the FD, exiting");
                        goto error;
                }
        }
 
 error:
-       cleanup();
        return NULL;
 }
 
@@ -608,7 +607,7 @@ static void *thread_poll_fds(void *data)
        int num_rdy, num_hup, high_prio, ret, i;
        struct pollfd *pollfd = NULL;
        /* local view of the fds */
-       struct ltt_kconsumerd_fd *local_kconsumerd_fd = NULL;
+       struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL;
        /* local view of fds_count */
        int nb_fd = 0;
 
@@ -618,6 +617,8 @@ static void *thread_poll_fds(void *data)
                goto end;
        }
 
+       local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
+
        while (1) {
                high_prio = 0;
                num_hup = 0;
@@ -627,7 +628,7 @@ static void *thread_poll_fds(void *data)
                 * local array as well
                 */
                if (update_fd_array) {
-                       ret = update_poll_array(&pollfd, &local_kconsumerd_fd);
+                       ret = update_poll_array(&pollfd, local_kconsumerd_fd);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
                                send_error(KCONSUMERD_POLL_ERROR);
@@ -666,7 +667,7 @@ static void *thread_poll_fds(void *data)
                                case POLLPRI:
                                        DBG("Urgent read on fd %d", pollfd[i].fd);
                                        high_prio = 1;
-                                       ret = read_subbuffer(&local_kconsumerd_fd[i]);
+                                       ret = read_subbuffer(local_kconsumerd_fd[i]);
                                        /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
                                        if (ret == EAGAIN) {
                                                ret = 0;
@@ -679,7 +680,7 @@ static void *thread_poll_fds(void *data)
                if (nb_fd > 0 && num_hup == nb_fd) {
                        DBG("every buffer FD has hung up\n");
                        send_error(KCONSUMERD_POLL_HUP);
-                       continue;
+                       goto end;
                }
 
                /* Take care of low priority channels. */
@@ -688,7 +689,7 @@ static void *thread_poll_fds(void *data)
                                switch(pollfd[i].revents) {
                                        case POLLIN:
                                                DBG("Normal read on fd %d", pollfd[i].fd);
-                                               ret = read_subbuffer(&local_kconsumerd_fd[i]);
+                                               ret = read_subbuffer(local_kconsumerd_fd[i]);
                                                /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
                                                if (ret == EAGAIN) {
                                                        ret = 0;
This page took 0.02914 seconds and 5 git commands to generate.