Callbacks on receive and update FD
authorJulien Desfossez <julien.desfossez@polymtl.ca>
Tue, 16 Aug 2011 23:25:04 +0000 (19:25 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 16 Aug 2011 23:25:04 +0000 (19:25 -0400)
The user of the lib can now take control over a new FD or the update
operation of an existing FD.
Opening the output tracefile is now the responsiblity of the user
and not the library itself.

[ Edit by Mathieu Desnoyers: comment and teardown cleanups ]

Signed-off-by: Julien Desfossez <julien.desfossez@polymtl.ca>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
include/lttng/lttng-kconsumerd.h
liblttngkconsumerd/lttngkconsumerd.c
ltt-kconsumerd/ltt-kconsumerd.c

index 7e195ab7ed3de9d93f533e25cdc625c564ea60db..e09bdc3bda26c7f7a8f09da34503e2dd71e99d13 100644 (file)
@@ -79,6 +79,31 @@ struct lttng_kconsumerd_fd {
 struct lttng_kconsumerd_local_data {
        /* function to call when data is available on a buffer */
        int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd);
+       /*
+        * function to call when we receive a new fd, it receives a
+        * newly allocated kconsumerd_fd, depending on the return code
+        * of this function, the new FD will be handled by the
+        * application or the library.
+        *
+        * Returns:
+        *    > 0 (success, FD is kept by application)
+        *   == 0 (success, FD is left to library)
+        *    < 0 (error)
+        */
+       int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd);
+       /*
+        * function to call when a FD is getting updated by the session
+        * daemon, this function receives the FD as seen by the session
+        * daemon (sessiond_fd) and the new state, depending on the
+        * return code of this function the update of state for the FD
+        * is handled by the application or the library.
+        *
+        * Returns:
+        *    > 0 (success, FD is kept by application)
+        *   == 0 (success, FD is left to library)
+        *    < 0 (error)
+        */
+       int (*on_update_fd)(int sessiond_fd, uint32_t state);
        /* socket to communicate errors with sessiond */
        int kconsumerd_error_socket;
        /* socket to exchange commands with sessiond */
@@ -98,15 +123,15 @@ struct lttng_kconsumerd_local_data {
  * - create the should_quit pipe (for signal handler)
  * - create the thread pipe (for splice)
  *
- * Takes a function pointer as argument, this function is called when data is
- * available on a buffer. This function is responsible to do the
- * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
- * buffer configuration and then kernctl_put_next_subbuf at the end.
+ * Takes the function pointers to the on_buffer_ready, on_recv_fd, and
+ * on_update_fd callbacks.
  *
  * Returns a pointer to the new context or NULL on error.
  */
 extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
-               int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd));
+               int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+               int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+               int (*update_fd)(int sessiond_fd, uint32_t state));
 
 /*
  * Close all fds associated with the instance and free the context.
index 69ef9a0e8b65173e9d6544bea875e6a9b6e9753c..5c9f613b11eddf48e4450e0cfd304b00c54d27e6 100644 (file)
@@ -28,6 +28,7 @@
 #include <sys/types.h>
 #include <unistd.h>
 #include <urcu/list.h>
+#include <assert.h>
 
 #include <lttng/lttng-kconsumerd.h>
 
@@ -125,22 +126,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf)
 }
 
 /*
- * Add a fd to the global list protected by a mutex.
+ * Create a struct lttcomm_kconsumerd_msg from the
+ * information received on the receiving socket
  */
-static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
+struct lttng_kconsumerd_fd *kconsumerd_allocate_fd(
+               struct lttcomm_kconsumerd_msg *buf,
                int consumerd_fd)
 {
        struct lttng_kconsumerd_fd *tmp_fd;
-       int ret = 0;
 
-       pthread_mutex_lock(&kconsumerd_data.lock);
-       /* Check if already exist */
-       ret = kconsumerd_find_session_fd(buf->fd);
-       if (ret == 1) {
+       tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
+       if (tmp_fd == NULL) {
+               perror("malloc struct lttng_kconsumerd_fd");
                goto end;
        }
 
-       tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
        tmp_fd->sessiond_fd = buf->fd;
        tmp_fd->consumerd_fd = consumerd_fd;
        tmp_fd->state = buf->state;
@@ -152,42 +152,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
        tmp_fd->output = buf->output;
        strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
        tmp_fd->path_name[PATH_MAX - 1] = '\0';
+       DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
+                       tmp_fd->path_name, tmp_fd->sessiond_fd,
+                       tmp_fd->consumerd_fd, tmp_fd->out_fd);
 
-       /* Opening the tracefile in write mode */
-       if (tmp_fd->path_name != NULL) {
-               ret = open(tmp_fd->path_name,
-                               O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
-               if (ret < 0) {
-                       ERR("Opening %s", tmp_fd->path_name);
-                       perror("open");
-                       goto end;
-               }
-               tmp_fd->out_fd = ret;
-               DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
-                               tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
-       }
+end:
+       return tmp_fd;
+}
 
-       if (tmp_fd->output == LTTNG_EVENT_MMAP) {
-               /* get the len of the mmap region */
-               ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len);
-               if (ret != 0) {
-                       ret = errno;
-                       perror("kernctl_get_mmap_len");
-                       goto end;
-               }
+/*
+ * Add a fd to the global list protected by a mutex.
+ */
+static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd)
+{
+       int ret;
 
-               tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len,
-                               PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0);
-               if (tmp_fd->mmap_base == MAP_FAILED) {
-                       perror("Error mmaping");
-                       ret = -1;
-                       goto end;
-               }
+       pthread_mutex_lock(&kconsumerd_data.lock);
+       /* Check if already exist */
+       ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd);
+       if (ret == 1) {
+               goto end;
        }
-
        cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
        kconsumerd_data.fds_count++;
        kconsumerd_data.need_update = 1;
+
 end:
        pthread_mutex_unlock(&kconsumerd_data.lock);
        return ret;
@@ -263,6 +252,7 @@ static int kconsumerd_consumerd_recv_fd(
        int nb_fd;
        char recv_fd[CMSG_SPACE(sizeof(int))];
        struct lttcomm_kconsumerd_msg lkm;
+       struct lttng_kconsumerd_fd *new_fd;
 
        /* the number of fds we are about to receive */
        nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
@@ -313,14 +303,34 @@ static int kconsumerd_consumerd_recv_fd(
                                        DBG("kconsumerd_add_fd %s (%d)", lkm.path_name,
                                                        ((int *) CMSG_DATA(cmsg))[0]);
 
-                                       ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
-                                       if (ret < 0) {
+                                       new_fd = kconsumerd_allocate_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
+                                       if (new_fd == NULL) {
                                                lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
                                                goto end;
                                        }
+
+                                       if (ctx->on_recv_fd != NULL) {
+                                               ret = ctx->on_recv_fd(new_fd);
+                                               if (ret == 0) {
+                                                       kconsumerd_add_fd(new_fd);
+                                               } else if (ret < 0) {
+                                                       goto end;
+                                               }
+                                       } else {
+                                               kconsumerd_add_fd(new_fd);
+                                       }
                                        break;
                                case UPDATE_STREAM:
-                                       kconsumerd_change_fd_state(lkm.fd, lkm.state);
+                                       if (ctx->on_update_fd != NULL) {
+                                               ret = ctx->on_update_fd(lkm.fd, lkm.state);
+                                               if (ret == 0) {
+                                                       kconsumerd_change_fd_state(lkm.fd, lkm.state);
+                                               } else if (ret < 0) {
+                                                       goto end;
+                                               }
+                                       } else {
+                                               kconsumerd_change_fd_state(lkm.fd, lkm.state);
+                                       }
                                        break;
                                default:
                                        break;
@@ -754,42 +764,63 @@ end:
  * Returns a pointer to the new context or NULL on error.
  */
 struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
-               int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd))
+               int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+               int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+               int (*update_fd)(int sessiond_fd, uint32_t state))
 {
-       int ret;
+       int ret, i;
        struct lttng_kconsumerd_local_data *ctx;
 
        ctx = malloc(sizeof(struct lttng_kconsumerd_local_data));
        if (ctx == NULL) {
                perror("allocating context");
-               goto end;
+               goto error;
        }
 
+       /* assign the callbacks */
        ctx->on_buffer_ready = buffer_ready;
+       ctx->on_recv_fd = recv_fd;
+       ctx->on_update_fd = update_fd;
 
        ret = pipe(ctx->kconsumerd_poll_pipe);
        if (ret < 0) {
                perror("Error creating poll pipe");
-               ctx = NULL;
-               goto end;
+               goto error_poll_pipe;
        }
 
        ret = pipe(ctx->kconsumerd_should_quit);
        if (ret < 0) {
                perror("Error creating recv pipe");
-               ctx = NULL;
-               goto end;
+               goto error_quit_pipe;
        }
 
        ret = pipe(ctx->kconsumerd_thread_pipe);
        if (ret < 0) {
                perror("Error creating thread pipe");
-               ctx = NULL;
-               goto end;
+               goto error_thread_pipe;
        }
 
-end:
        return ctx;
+
+
+error_thread_pipe:
+       for (i = 0; i < 2; i++) {
+               int err;
+
+               err = close(ctx->kconsumerd_should_quit[i]);
+               assert(!err);
+       }
+error_quit_pipe:
+       for (i = 0; i < 2; i++) {
+               int err;
+
+               err = close(ctx->kconsumerd_poll_pipe[i]);
+               assert(!err);
+       }
+error_poll_pipe:
+       free(ctx);
+error:
+       return NULL;
 }
 
 /*
index ca939656b7c888d84173d460aa4582d6bbad2e22..7eab42e522d0f45bcb126e101f90d4d543a604e4 100644 (file)
@@ -36,6 +36,7 @@
 #include <poll.h>
 #include <unistd.h>
 #include <sys/mman.h>
+#include <assert.h>
 
 #include <lttng/lttng-kconsumerd.h>
 
@@ -277,6 +278,54 @@ end:
        return ret;
 }
 
+static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd)
+{
+       int ret;
+
+       /* Opening the tracefile in write mode */
+       if (kconsumerd_fd->path_name != NULL) {
+               ret = open(kconsumerd_fd->path_name,
+                               O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+               if (ret < 0) {
+                       ERR("Opening %s", kconsumerd_fd->path_name);
+                       perror("open");
+                       goto error;
+               }
+               kconsumerd_fd->out_fd = ret;
+       }
+
+       if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) {
+               /* get the len of the mmap region */
+               ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &kconsumerd_fd->mmap_len);
+               if (ret != 0) {
+                       ret = errno;
+                       perror("kernctl_get_mmap_len");
+                       goto error_close_fd;
+               }
+
+               kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len,
+                               PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0);
+               if (kconsumerd_fd->mmap_base == MAP_FAILED) {
+                       perror("Error mmaping");
+                       ret = -1;
+                       goto error_close_fd;
+               }
+       }
+
+       /* we return 0 to let the library handle the FD internally */
+       return 0;
+
+error_close_fd:
+       {
+               int err;
+
+               err = close(kconsumerd_fd->out_fd);
+               assert(!err);
+       }
+error:
+       return ret;
+}
+
 /*
  * main
  */
@@ -303,8 +352,8 @@ int main(int argc, char **argv)
                snprintf(command_sock_path, PATH_MAX,
                                KCONSUMERD_CMD_SOCK_PATH);
        }
-       /* create the pipe to wake to receiving thread when needed */
-       ctx = lttng_kconsumerd_create(read_subbuffer);
+       /* create the consumer instance with and assign the callbacks */
+       ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL);
        if (ctx == NULL) {
                goto error;
        }
This page took 0.032038 seconds and 5 git commands to generate.