X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=c3f8ab3bdbc7dfb8aff2c5eca9b146276c6ac22d;hb=36134aa184c8ed8f67a197d66d1310425abbf1ce;hp=f7bb53ef7ead038e479964b6e11bb2259eeb1cd7;hpb=d88aee689d5bd0067f362a323cb69c37717df59f;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index f7bb53ef7..c3f8ab3bd 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +46,7 @@ #include #include "lttng-sessiond.h" +#include "buffer-registry.h" #include "channel.h" #include "cmd.h" #include "consumer.h" @@ -88,6 +90,7 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -99,6 +102,7 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -110,6 +114,7 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -227,6 +232,9 @@ static enum consumerd_state kernel_consumerd_state; */ static int app_socket_timeout; +/* Set in main() with the current page size. */ +long page_size; + static void setup_consumerd_path(void) { @@ -435,6 +443,7 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); + buffer_reg_destroy_registries(); if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); @@ -863,10 +872,10 @@ static void *thread_manage_consumer(void *data) health_code_update(); /* - * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. - * Nothing more will be added to this poll set. + * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the + * metadata_sock. Nothing more will be added to this poll set. */ - ret = sessiond_set_thread_pollset(&events, 2); + ret = sessiond_set_thread_pollset(&events, 3); if (ret < 0) { goto error_poll; } @@ -883,7 +892,7 @@ static void *thread_manage_consumer(void *data) health_code_update(); - /* Inifinite blocking call, waiting for transmission */ + /* Infinite blocking call, waiting for transmission */ restart: health_poll_entry(); @@ -953,87 +962,126 @@ restart: health_code_update(); if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { + /* Connect both socket, command and metadata. */ consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - if (consumer_data->cmd_sock < 0) { + consumer_data->metadata_sock.fd = + lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); + if (consumer_data->cmd_sock < 0 || + consumer_data->metadata_sock.fd < 0) { + PERROR("consumer connect cmd socket"); /* On error, signal condition and quit. */ signal_consumer_condition(consumer_data, -1); - PERROR("consumer connect"); goto error; } + /* Create metadata socket lock. */ + consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); + if (consumer_data->metadata_sock.lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = -1; + goto error; + } + pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + signal_consumer_condition(consumer_data, 1); - DBG("Consumer command socket ready"); + DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); + DBG("Consumer metadata socket ready (fd: %d)", + consumer_data->metadata_sock.fd); } else { ERR("consumer error when waiting for SOCK_READY : %s", lttcomm_get_readable_code(-code)); goto error; } - /* Remove the kconsumerd error sock since we've established a connexion */ + /* Remove the consumerd error sock since we've established a connexion */ ret = lttng_poll_del(&events, consumer_data->err_sock); if (ret < 0) { goto error; } + /* Add new accepted error socket. */ ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; } + /* Add metadata socket that is successfully connected. */ + ret = lttng_poll_add(&events, consumer_data->metadata_sock.fd, + LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + health_code_update(); - /* Inifinite blocking call, waiting for transmission */ + /* Infinite blocking call, waiting for transmission */ restart_poll: - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart_poll; + while (1) { + health_poll_entry(); + ret = lttng_poll_wait(&events, -1); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart_poll; + } + goto error; } - goto error; - } - nb_fd = ret; + nb_fd = ret; - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(); + health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } + /* Thread quit pipe has been closed. Killing thread. */ + ret = sessiond_check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } - /* Event on the kconsumerd socket */ - if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("consumer err socket second poll error"); + if (pollfd == sock) { + /* Event on the consumerd socket */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("consumer err socket second poll error"); + goto error; + } + health_code_update(); + /* Wait for any kconsumerd error */ + ret = lttcomm_recv_unix_sock(sock, &code, + sizeof(enum lttcomm_return_code)); + if (ret <= 0) { + ERR("consumer closed the command socket"); + goto error; + } + + ERR("consumer return code : %s", + lttcomm_get_readable_code(-code)); + + goto exit; + } else if (pollfd == consumer_data->metadata_sock.fd) { + /* UST metadata requests */ + ret = ust_consumer_metadata_request( + &consumer_data->metadata_sock); + if (ret < 0) { + ERR("Handling metadata request"); + goto error; + } + break; + } else { + ERR("Unknown pollfd"); goto error; } } + health_code_update(); } - health_code_update(); - - /* Wait for any kconsumerd error */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); - if (ret <= 0) { - ERR("consumer closed the command socket"); - goto error; - } - - ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); - exit: error: /* Immediately set the consumerd state to stopped */ @@ -1059,6 +1107,16 @@ error: PERROR("close"); } } + if (consumer_data->metadata_sock.fd >= 0) { + ret = close(consumer_data->metadata_sock.fd); + if (ret) { + PERROR("close"); + } + } + /* Cleanup metadata socket mutex. */ + pthread_mutex_destroy(consumer_data->metadata_sock.lock); + free(consumer_data->metadata_sock.lock); + if (sock >= 0) { ret = close(sock); if (ret) { @@ -1299,6 +1357,7 @@ static void *thread_dispatch_ust_registration(void *data) do { struct ust_app *app = NULL; + ust_cmd = NULL; /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); @@ -1321,6 +1380,7 @@ static void *thread_dispatch_ust_registration(void *data) wait_node = zmalloc(sizeof(*wait_node)); if (!wait_node) { PERROR("zmalloc wait_node dispatch"); + free(ust_cmd); goto error; } CDS_INIT_LIST_HEAD(&wait_node->head); @@ -1335,6 +1395,7 @@ static void *thread_dispatch_ust_registration(void *data) } lttng_fd_put(1, LTTNG_FD_APPS); free(wait_node); + free(ust_cmd); continue; } /* @@ -1343,6 +1404,7 @@ static void *thread_dispatch_ust_registration(void *data) */ cds_list_add(&wait_node->head, &wait_queue); + free(ust_cmd); /* * We have to continue here since we don't have the notify * socket and the application MUST be added to the hash table @@ -1365,6 +1427,7 @@ static void *thread_dispatch_ust_registration(void *data) break; } } + free(ust_cmd); } if (app) { @@ -1433,7 +1496,6 @@ static void *thread_dispatch_ust_registration(void *data) } lttng_fd_put(1, LTTNG_FD_APPS); } - free(ust_cmd); } while (node != NULL); /* Futex wait on queue. Blocking call on futex() */ @@ -1879,6 +1941,7 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) ret = putenv(tmpnew); if (ret) { ret = -errno; + free(tmpnew); goto error; } } @@ -1923,6 +1986,7 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) ret = putenv(tmpnew); if (ret) { ret = -errno; + free(tmpnew); goto error; } } @@ -2003,7 +2067,7 @@ end: return 0; error: - /* Cleanup already created socket on error. */ + /* Cleanup already created sockets on error. */ if (consumer_data->err_sock >= 0) { int err; @@ -2098,6 +2162,8 @@ error: * Copy consumer output from the tracing session to the domain session. The * function also applies the right modification on a per domain basis for the * trace files destination directory. + * + * Should *NOT* be called with RCU read-side lock held. */ static int copy_session_consumer(int domain, struct ltt_session *session) { @@ -2155,6 +2221,8 @@ error: /* * Create an UST session and add it to the session ust list. + * + * Should *NOT* be called with RCU read-side lock held. */ static int create_ust_session(struct ltt_session *session, struct lttng_domain *domain) @@ -2177,7 +2245,7 @@ static int create_ust_session(struct ltt_session *session, DBG("Creating UST session"); - lus = trace_ust_create_session(session->path, session->id); + lus = trace_ust_create_session(session->id); if (lus == NULL) { ret = LTTNG_ERR_UST_SESS_FAIL; goto error; @@ -2280,6 +2348,8 @@ static unsigned int lttng_sessions_count(uid_t uid, gid_t gid) * Return any error encountered or 0 for success. * * "sock" is only used for special-case var. len data. + * + * Should *NOT* be called with RCU read-side lock held. */ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, int *sock_error) @@ -2610,13 +2680,13 @@ skip_domain: } case LTTNG_ENABLE_CHANNEL: { - ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_channel(cmd_ctx->session, &cmd_ctx->lsm->domain, &cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]); break; } case LTTNG_ENABLE_EVENT: { - ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, &cmd_ctx->lsm->u.enable.event, NULL, kernel_poll_pipe[1]); break; @@ -2625,7 +2695,7 @@ skip_domain: { DBG("Enabling all events"); - ret = cmd_enable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event_all(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, cmd_ctx->lsm->u.enable.event.type, NULL, kernel_poll_pipe[1]); break; @@ -2825,6 +2895,7 @@ skip_domain: ret = setup_lttng_msg(cmd_ctx, nb_dom * sizeof(struct lttng_domain)); if (ret < 0) { + free(domains); goto setup_error; } @@ -2852,6 +2923,7 @@ skip_domain: ret = setup_lttng_msg(cmd_ctx, nb_chan * sizeof(struct lttng_channel)); if (ret < 0) { + free(channels); goto setup_error; } @@ -2879,6 +2951,7 @@ skip_domain: ret = setup_lttng_msg(cmd_ctx, nb_event * sizeof(struct lttng_event)); if (ret < 0) { + free(events); goto setup_error; } @@ -2974,7 +3047,7 @@ skip_domain: goto error; } - ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, &cmd_ctx->lsm->u.enable.event, bytecode, kernel_poll_pipe[1]); break; @@ -3376,13 +3449,11 @@ static void *thread_manage_clients(void *data) ret = process_client_msg(cmd_ctx, sock, &sock_error); rcu_thread_offline(); if (ret < 0) { - if (sock_error) { - ret = close(sock); - if (ret) { - PERROR("close"); - } - sock = -1; + ret = close(sock); + if (ret) { + PERROR("close"); } + sock = -1; /* * TODO: Inform client somehow of the fatal error. At * this point, ret < 0 means that a zmalloc failed @@ -3971,6 +4042,13 @@ int main(int argc, char **argv) setup_consumerd_path(); + page_size = sysconf(_SC_PAGESIZE); + if (page_size < 0) { + PERROR("sysconf _SC_PAGESIZE"); + page_size = LONG_MAX; + WARN("Fallback page size to %ld", page_size); + } + /* Parse arguments */ progname = argv[0]; if ((ret = parse_args(argc, argv)) < 0) { @@ -4217,6 +4295,10 @@ int main(int argc, char **argv) goto exit; } + /* Initialize global buffer per UID and PID registry. */ + buffer_reg_init_uid_registry(); + buffer_reg_init_pid_registry(); + /* Init UST command queue. */ cds_wfq_init(&ust_cmd_queue.queue);