X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=15bb7255a882dab71abe546e9d8e95bc631f92ea;hp=f7bb53ef7ead038e479964b6e11bb2259eeb1cd7;hb=331744e34f56a5aec69b05d356d6901e67926acc;hpb=5d2e1e66a968d9e555f9b8b00d0589ebfaf3de32 diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index f7bb53ef7..15bb7255a 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +46,7 @@ #include #include "lttng-sessiond.h" +#include "buffer-registry.h" #include "channel.h" #include "cmd.h" #include "consumer.h" @@ -88,6 +90,7 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -99,6 +102,7 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -110,6 +114,7 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -435,6 +440,7 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); + buffer_reg_destroy_registries(); if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); @@ -863,10 +869,10 @@ static void *thread_manage_consumer(void *data) health_code_update(); /* - * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. - * Nothing more will be added to this poll set. + * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the + * metadata_sock. Nothing more will be added to this poll set. */ - ret = sessiond_set_thread_pollset(&events, 2); + ret = sessiond_set_thread_pollset(&events, 3); if (ret < 0) { goto error_poll; } @@ -883,7 +889,7 @@ static void *thread_manage_consumer(void *data) health_code_update(); - /* Inifinite blocking call, waiting for transmission */ + /* Infinite blocking call, waiting for transmission */ restart: health_poll_entry(); @@ -953,87 +959,126 @@ restart: health_code_update(); if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { + /* Connect both socket, command and metadata. */ consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - if (consumer_data->cmd_sock < 0) { + consumer_data->metadata_sock.fd = + lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); + if (consumer_data->cmd_sock < 0 || + consumer_data->metadata_sock.fd < 0) { + PERROR("consumer connect cmd socket"); /* On error, signal condition and quit. */ signal_consumer_condition(consumer_data, -1); - PERROR("consumer connect"); goto error; } + /* Create metadata socket lock. */ + consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); + if (consumer_data->metadata_sock.lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = -1; + goto error; + } + pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + signal_consumer_condition(consumer_data, 1); - DBG("Consumer command socket ready"); + DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); + DBG("Consumer metadata socket ready (fd: %d)", + consumer_data->metadata_sock.fd); } else { ERR("consumer error when waiting for SOCK_READY : %s", lttcomm_get_readable_code(-code)); goto error; } - /* Remove the kconsumerd error sock since we've established a connexion */ + /* Remove the consumerd error sock since we've established a connexion */ ret = lttng_poll_del(&events, consumer_data->err_sock); if (ret < 0) { goto error; } + /* Add new accepted error socket. */ ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; } + /* Add metadata socket that is successfully connected. */ + ret = lttng_poll_add(&events, consumer_data->metadata_sock.fd, + LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + health_code_update(); - /* Inifinite blocking call, waiting for transmission */ + /* Infinite blocking call, waiting for transmission */ restart_poll: - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart_poll; + while (1) { + health_poll_entry(); + ret = lttng_poll_wait(&events, -1); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart_poll; + } + goto error; } - goto error; - } - nb_fd = ret; + nb_fd = ret; - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(); + health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } + /* Thread quit pipe has been closed. Killing thread. */ + ret = sessiond_check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } - /* Event on the kconsumerd socket */ - if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("consumer err socket second poll error"); + if (pollfd == sock) { + /* Event on the consumerd socket */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("consumer err socket second poll error"); + goto error; + } + health_code_update(); + /* Wait for any kconsumerd error */ + ret = lttcomm_recv_unix_sock(sock, &code, + sizeof(enum lttcomm_return_code)); + if (ret <= 0) { + ERR("consumer closed the command socket"); + goto error; + } + + ERR("consumer return code : %s", + lttcomm_get_readable_code(-code)); + + goto exit; + } else if (pollfd == consumer_data->metadata_sock.fd) { + /* UST metadata requests */ + ret = ust_consumer_metadata_request( + &consumer_data->metadata_sock); + if (ret < 0) { + ERR("Handling metadata request"); + goto error; + } + break; + } else { + ERR("Unknown pollfd"); goto error; } } + health_code_update(); } - health_code_update(); - - /* Wait for any kconsumerd error */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); - if (ret <= 0) { - ERR("consumer closed the command socket"); - goto error; - } - - ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); - exit: error: /* Immediately set the consumerd state to stopped */ @@ -1059,6 +1104,16 @@ error: PERROR("close"); } } + if (consumer_data->metadata_sock.fd >= 0) { + ret = close(consumer_data->metadata_sock.fd); + if (ret) { + PERROR("close"); + } + } + /* Cleanup metadata socket mutex. */ + pthread_mutex_destroy(consumer_data->metadata_sock.lock); + free(consumer_data->metadata_sock.lock); + if (sock >= 0) { ret = close(sock); if (ret) { @@ -1299,6 +1354,7 @@ static void *thread_dispatch_ust_registration(void *data) do { struct ust_app *app = NULL; + ust_cmd = NULL; /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); @@ -1321,6 +1377,7 @@ static void *thread_dispatch_ust_registration(void *data) wait_node = zmalloc(sizeof(*wait_node)); if (!wait_node) { PERROR("zmalloc wait_node dispatch"); + free(ust_cmd); goto error; } CDS_INIT_LIST_HEAD(&wait_node->head); @@ -1335,6 +1392,7 @@ static void *thread_dispatch_ust_registration(void *data) } lttng_fd_put(1, LTTNG_FD_APPS); free(wait_node); + free(ust_cmd); continue; } /* @@ -1343,6 +1401,7 @@ static void *thread_dispatch_ust_registration(void *data) */ cds_list_add(&wait_node->head, &wait_queue); + free(ust_cmd); /* * We have to continue here since we don't have the notify * socket and the application MUST be added to the hash table @@ -1365,6 +1424,7 @@ static void *thread_dispatch_ust_registration(void *data) break; } } + free(ust_cmd); } if (app) { @@ -1433,7 +1493,6 @@ static void *thread_dispatch_ust_registration(void *data) } lttng_fd_put(1, LTTNG_FD_APPS); } - free(ust_cmd); } while (node != NULL); /* Futex wait on queue. Blocking call on futex() */ @@ -1879,6 +1938,7 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) ret = putenv(tmpnew); if (ret) { ret = -errno; + free(tmpnew); goto error; } } @@ -1923,6 +1983,7 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) ret = putenv(tmpnew); if (ret) { ret = -errno; + free(tmpnew); goto error; } } @@ -2003,7 +2064,7 @@ end: return 0; error: - /* Cleanup already created socket on error. */ + /* Cleanup already created sockets on error. */ if (consumer_data->err_sock >= 0) { int err; @@ -2610,13 +2671,13 @@ skip_domain: } case LTTNG_ENABLE_CHANNEL: { - ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_channel(cmd_ctx->session, &cmd_ctx->lsm->domain, &cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]); break; } case LTTNG_ENABLE_EVENT: { - ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, &cmd_ctx->lsm->u.enable.event, NULL, kernel_poll_pipe[1]); break; @@ -2625,7 +2686,7 @@ skip_domain: { DBG("Enabling all events"); - ret = cmd_enable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event_all(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, cmd_ctx->lsm->u.enable.event.type, NULL, kernel_poll_pipe[1]); break; @@ -2974,7 +3035,7 @@ skip_domain: goto error; } - ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, &cmd_ctx->lsm->u.enable.event, bytecode, kernel_poll_pipe[1]); break; @@ -4217,6 +4278,10 @@ int main(int argc, char **argv) goto exit; } + /* Initialize global buffer per UID and PID registry. */ + buffer_reg_init_uid_registry(); + buffer_reg_init_pid_registry(); + /* Init UST command queue. */ cds_wfq_init(&ust_cmd_queue.queue);