X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=550aa1b83f68d65f01e936d1e81fb5308b0b2b4e;hp=bfc8a389bd62d0a32446bf7850e28929c7f1e593;hb=55d097957f5bb8138959ad2202a40d85d49f029e;hpb=5c786dedd0156b93984f89ba47ec841277ed7dae diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index bfc8a389b..550aa1b83 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -60,7 +60,7 @@ #include "ust-consumer.h" #include "utils.h" #include "fd-limit.h" -#include "health.h" +#include "health-sessiond.h" #include "testpoint.h" #include "ust-thread.h" @@ -90,7 +90,6 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, - .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -102,7 +101,6 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, - .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -114,7 +112,6 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, - .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -236,6 +233,9 @@ static int app_socket_timeout; /* Set in main() with the current page size. */ long page_size; +/* Application health monitoring */ +struct health_app *health_sessiond; + static void setup_consumerd_path(void) { @@ -446,7 +446,10 @@ static void cleanup(void) DBG("Cleaning up"); - /* First thing first, stop all threads */ + /* + * Close the thread quit pipe. It has already done its job, + * since we are now called. + */ utils_close_pipe(thread_quit_pipe); /* @@ -690,9 +693,6 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) rcu_read_lock(); cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, node.node) { - /* Code flow error */ - assert(socket->fd >= 0); - pthread_mutex_lock(socket->lock); ret = kernel_consumer_send_channel_stream(socket, channel, ksess, @@ -727,6 +727,12 @@ static void update_ust_app(int app_sock) { struct ltt_session *sess, *stmp; + /* Consumer is in an ERROR state. Stop any application update. */ + if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) { + /* Stop the update process since the consumer is dead. */ + return; + } + /* For all tracing session(s) */ cds_list_for_each_entry_safe(sess, stmp, &session_list_ptr->head, list) { session_lock(sess); @@ -752,7 +758,7 @@ static void *thread_manage_kernel(void *data) DBG("[thread] Thread manage kernel started"); - health_register(HEALTH_TYPE_KERNEL); + health_register(health_sessiond, HEALTH_TYPE_KERNEL); /* * This first step of the while is to clean this structure which could free @@ -877,7 +883,7 @@ error_testpoint: WARN("Kernel thread died unexpectedly. " "Kernel tracing can continue but CPU hotplug is disabled."); } - health_unregister(); + health_unregister(health_sessiond); DBG("Kernel thread dying"); return NULL; } @@ -918,7 +924,7 @@ static void *thread_manage_consumer(void *data) DBG("[thread] Manage consumer started"); - health_register(HEALTH_TYPE_CONSUMER); + health_register(health_sessiond, HEALTH_TYPE_CONSUMER); health_code_update(); @@ -1016,15 +1022,16 @@ restart: /* Connect both socket, command and metadata. */ consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - consumer_data->metadata_sock.fd = + consumer_data->metadata_fd = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - if (consumer_data->cmd_sock < 0 || - consumer_data->metadata_sock.fd < 0) { + if (consumer_data->cmd_sock < 0 + || consumer_data->metadata_fd < 0) { PERROR("consumer connect cmd socket"); /* On error, signal condition and quit. */ signal_consumer_condition(consumer_data, -1); goto error; } + consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; /* Create metadata socket lock. */ consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); if (consumer_data->metadata_sock.lock == NULL) { @@ -1037,7 +1044,7 @@ restart: signal_consumer_condition(consumer_data, 1); DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); DBG("Consumer metadata socket ready (fd: %d)", - consumer_data->metadata_sock.fd); + consumer_data->metadata_fd); } else { ERR("consumer error when waiting for SOCK_READY : %s", lttcomm_get_readable_code(-code)); @@ -1057,7 +1064,7 @@ restart: } /* Add metadata socket that is successfully connected. */ - ret = lttng_poll_add(&events, consumer_data->metadata_sock.fd, + ret = lttng_poll_add(&events, consumer_data->metadata_fd, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; @@ -1116,7 +1123,7 @@ restart_poll: lttcomm_get_readable_code(-code)); goto exit; - } else if (pollfd == consumer_data->metadata_sock.fd) { + } else if (pollfd == consumer_data->metadata_fd) { /* UST metadata requests */ ret = ust_consumer_metadata_request( &consumer_data->metadata_sock); @@ -1135,6 +1142,13 @@ restart_poll: exit: error: + /* + * We lock here because we are about to close the sockets and some other + * thread might be using them so get exclusive access which will abort all + * other consumer command by other threads. + */ + pthread_mutex_lock(&consumer_data->lock); + /* Immediately set the consumerd state to stopped */ if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR); @@ -1160,15 +1174,12 @@ error: } consumer_data->cmd_sock = -1; } - if (consumer_data->metadata_sock.fd >= 0) { - ret = close(consumer_data->metadata_sock.fd); + if (*consumer_data->metadata_sock.fd_ptr >= 0) { + ret = close(*consumer_data->metadata_sock.fd_ptr); if (ret) { PERROR("close"); } } - /* Cleanup metadata socket mutex. */ - pthread_mutex_destroy(consumer_data->metadata_sock.lock); - free(consumer_data->metadata_sock.lock); if (sock >= 0) { ret = close(sock); @@ -1180,6 +1191,11 @@ error: unlink(consumer_data->err_unix_sock_path); unlink(consumer_data->cmd_unix_sock_path); consumer_data->pid = 0; + pthread_mutex_unlock(&consumer_data->lock); + + /* Cleanup metadata socket mutex. */ + pthread_mutex_destroy(consumer_data->metadata_sock.lock); + free(consumer_data->metadata_sock.lock); lttng_poll_clean(&events); error_poll: @@ -1187,7 +1203,7 @@ error_poll: health_error(); ERR("Health error occurred in %s", __func__); } - health_unregister(); + health_unregister(health_sessiond); DBG("consumer thread cleanup completed"); return NULL; @@ -1207,7 +1223,7 @@ static void *thread_manage_apps(void *data) rcu_register_thread(); rcu_thread_online(); - health_register(HEALTH_TYPE_APP_MANAGE); + health_register(health_sessiond, HEALTH_TYPE_APP_MANAGE); if (testpoint(thread_manage_apps)) { goto error_testpoint; @@ -1295,11 +1311,17 @@ static void *thread_manage_apps(void *data) goto error; } - /* Set socket timeout for both receiving and ending */ + /* + * Set socket timeout for both receiving and ending. + * app_socket_timeout is in seconds, whereas + * lttcomm_setsockopt_rcv_timeout and + * lttcomm_setsockopt_snd_timeout expect msec as + * parameter. + */ (void) lttcomm_setsockopt_rcv_timeout(sock, - app_socket_timeout); + app_socket_timeout * 1000); (void) lttcomm_setsockopt_snd_timeout(sock, - app_socket_timeout); + app_socket_timeout * 1000); DBG("Apps with sock %d added to poll set", sock); @@ -1347,7 +1369,7 @@ error_testpoint: health_error(); ERR("Health error occurred in %s", __func__); } - health_unregister(); + health_unregister(health_sessiond); DBG("Application communication apps thread cleanup complete"); rcu_thread_offline(); rcu_unregister_thread(); @@ -1358,6 +1380,9 @@ error_testpoint: * Send a socket to a thread This is called from the dispatch UST registration * thread once all sockets are set for the application. * + * The sock value can be invalid, we don't really care, the thread will handle + * it and make the necessary cleanup if so. + * * On success, return 0 else a negative value being the errno message of the * write(). */ @@ -1365,9 +1390,14 @@ static int send_socket_to_thread(int fd, int sock) { int ret; - /* Sockets MUST be set or else this should not have been called. */ - assert(fd >= 0); - assert(sock >= 0); + /* + * It's possible that the FD is set as invalid with -1 concurrently just + * before calling this function being a shutdown state of the thread. + */ + if (fd < 0) { + ret = -EBADF; + goto error; + } do { ret = write(fd, &sock, sizeof(sock)); @@ -1485,7 +1515,7 @@ static void *thread_dispatch_ust_registration(void *data) .count = 0, }; - health_register(HEALTH_TYPE_APP_REG_DISPATCH); + health_register(health_sessiond, HEALTH_TYPE_APP_REG_DISPATCH); health_code_update(); @@ -1630,7 +1660,12 @@ static void *thread_dispatch_ust_registration(void *data) if (ret < 0) { rcu_read_unlock(); session_unlock_list(); - /* No notify thread, stop the UST tracing. */ + /* + * No notify thread, stop the UST tracing. However, this is + * not an internal error of the this thread thus setting + * the health error code to a normal exit. + */ + err = 0; goto error; } @@ -1655,7 +1690,12 @@ static void *thread_dispatch_ust_registration(void *data) if (ret < 0) { rcu_read_unlock(); session_unlock_list(); - /* No apps. thread, stop the UST tracing. */ + /* + * No apps. thread, stop the UST tracing. However, this is + * not an internal error of the this thread thus setting + * the health error code to a normal exit. + */ + err = 0; goto error; } @@ -1686,7 +1726,7 @@ error: health_error(); ERR("Health error occurred in %s", __func__); } - health_unregister(); + health_unregister(health_sessiond); return NULL; } @@ -1706,7 +1746,7 @@ static void *thread_registration_apps(void *data) DBG("[thread] Manage application registration started"); - health_register(HEALTH_TYPE_APP_REG); + health_register(health_sessiond, HEALTH_TYPE_APP_REG); if (testpoint(thread_registration_apps)) { goto error_testpoint; @@ -1886,7 +1926,7 @@ error_listen: error_create_poll: error_testpoint: DBG("UST Registration thread cleanup complete"); - health_unregister(); + health_unregister(health_sessiond); return NULL; } @@ -2263,7 +2303,7 @@ static int check_consumer_health(void) { int ret; - ret = health_check_state(HEALTH_TYPE_CONSUMER); + ret = health_check_state(health_sessiond, HEALTH_TYPE_CONSUMER); DBG3("Health consumer check %d", ret); @@ -2431,6 +2471,7 @@ static int create_ust_session(struct ltt_session *session, lus->gid = session->gid; lus->output_traces = session->output_traces; lus->snapshot_mode = session->snapshot_mode; + lus->live_timer_interval = session->live_timer; session->ust_session = lus; /* Copy session output to the newly created UST session */ @@ -2545,6 +2586,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: case LTTNG_CREATE_SESSION_SNAPSHOT: + case LTTNG_CREATE_SESSION_LIVE: case LTTNG_DESTROY_SESSION: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_DOMAINS: @@ -2608,6 +2650,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: case LTTNG_CREATE_SESSION_SNAPSHOT: + case LTTNG_CREATE_SESSION_LIVE: case LTTNG_CALIBRATE: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_TRACEPOINTS: @@ -2701,6 +2744,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, break; case LTTNG_DOMAIN_UST: { + if (!ust_app_supported()) { + ret = LTTNG_ERR_NO_UST; + goto error; + } /* Consumer is in an ERROR state. Report back to client */ if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) { ret = LTTNG_ERR_NO_USTCONSUMERD; @@ -3048,7 +3095,7 @@ skip_domain: } ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, uris, nb_uri, - &cmd_ctx->creds); + &cmd_ctx->creds, 0); free(uris); @@ -3338,6 +3385,45 @@ skip_domain: free(uris); break; } + case LTTNG_CREATE_SESSION_LIVE: + { + size_t nb_uri, len; + struct lttng_uri *uris = NULL; + + nb_uri = cmd_ctx->lsm->u.uri.size; + len = nb_uri * sizeof(struct lttng_uri); + + if (nb_uri > 0) { + uris = zmalloc(len); + if (uris == NULL) { + ret = LTTNG_ERR_FATAL; + goto error; + } + + /* Receive variable len data */ + DBG("Waiting for %zu URIs from client ...", nb_uri); + ret = lttcomm_recv_unix_sock(sock, uris, len); + if (ret <= 0) { + DBG("No URIs received from client... continuing"); + *sock_error = 1; + ret = LTTNG_ERR_SESSION_FAIL; + free(uris); + goto error; + } + + if (nb_uri == 1 && uris[0].dtype != LTTNG_DST_PATH) { + DBG("Creating session with ONE network URI is a bad call"); + ret = LTTNG_ERR_SESSION_FAIL; + free(uris); + goto error; + } + } + + ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, uris, + nb_uri, &cmd_ctx->creds, cmd_ctx->lsm->u.session_live.timer_interval); + free(uris); + break; + } default: ret = LTTNG_ERR_UND; break; @@ -3481,39 +3567,39 @@ restart: switch (msg.component) { case LTTNG_HEALTH_CMD: - reply.ret_code = health_check_state(HEALTH_TYPE_CMD); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_CMD); break; case LTTNG_HEALTH_APP_MANAGE: - reply.ret_code = health_check_state(HEALTH_TYPE_APP_MANAGE); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_APP_MANAGE); break; case LTTNG_HEALTH_APP_REG: - reply.ret_code = health_check_state(HEALTH_TYPE_APP_REG); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_APP_REG); break; case LTTNG_HEALTH_KERNEL: - reply.ret_code = health_check_state(HEALTH_TYPE_KERNEL); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_KERNEL); break; case LTTNG_HEALTH_CONSUMER: reply.ret_code = check_consumer_health(); break; case LTTNG_HEALTH_HT_CLEANUP: - reply.ret_code = health_check_state(HEALTH_TYPE_HT_CLEANUP); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_HT_CLEANUP); break; case LTTNG_HEALTH_APP_MANAGE_NOTIFY: - reply.ret_code = health_check_state(HEALTH_TYPE_APP_MANAGE_NOTIFY); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_APP_MANAGE_NOTIFY); break; case LTTNG_HEALTH_APP_REG_DISPATCH: - reply.ret_code = health_check_state(HEALTH_TYPE_APP_REG_DISPATCH); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_APP_REG_DISPATCH); break; case LTTNG_HEALTH_ALL: reply.ret_code = - health_check_state(HEALTH_TYPE_APP_MANAGE) && - health_check_state(HEALTH_TYPE_APP_REG) && - health_check_state(HEALTH_TYPE_CMD) && - health_check_state(HEALTH_TYPE_KERNEL) && + health_check_state(health_sessiond, HEALTH_TYPE_APP_MANAGE) && + health_check_state(health_sessiond, HEALTH_TYPE_APP_REG) && + health_check_state(health_sessiond, HEALTH_TYPE_CMD) && + health_check_state(health_sessiond, HEALTH_TYPE_KERNEL) && check_consumer_health() && - health_check_state(HEALTH_TYPE_HT_CLEANUP) && - health_check_state(HEALTH_TYPE_APP_MANAGE_NOTIFY) && - health_check_state(HEALTH_TYPE_APP_REG_DISPATCH); + health_check_state(health_sessiond, HEALTH_TYPE_HT_CLEANUP) && + health_check_state(health_sessiond, HEALTH_TYPE_APP_MANAGE_NOTIFY) && + health_check_state(health_sessiond, HEALTH_TYPE_APP_REG_DISPATCH); break; default: reply.ret_code = LTTNG_ERR_UND; @@ -3581,7 +3667,7 @@ static void *thread_manage_clients(void *data) rcu_register_thread(); - health_register(HEALTH_TYPE_CMD); + health_register(health_sessiond, HEALTH_TYPE_CMD); if (testpoint(thread_manage_clients)) { goto error_testpoint; @@ -3805,7 +3891,7 @@ error_testpoint: ERR("Health error occurred in %s", __func__); } - health_unregister(); + health_unregister(health_sessiond); DBG("Client thread dying"); @@ -4066,12 +4152,12 @@ static int set_permissions(char *rundir) ret = allowed_group(); if (ret < 0) { WARN("No tracing group detected"); - ret = 0; - goto end; + /* Setting gid to 0 if no tracing group is found */ + gid = 0; + } else { + gid = ret; } - gid = ret; - /* Set lttng run dir */ ret = chown(rundir, 0, gid); if (ret < 0) { @@ -4079,7 +4165,7 @@ static int set_permissions(char *rundir) PERROR("chown"); } - /* Ensure tracing group can search the run dir */ + /* Ensure all applications and tracing group can search the run dir */ ret = chmod(rundir, S_IRWXU | S_IXGRP | S_IXOTH); if (ret < 0) { ERR("Unable to set permissions on %s", rundir); @@ -4116,7 +4202,6 @@ static int set_permissions(char *rundir) DBG("All permissions are set"); -end: return ret; } @@ -4628,6 +4713,21 @@ int main(int argc, char **argv) write_pidfile(); + /* Initialize communication library */ + lttcomm_init(); + /* This is to get the TCP timeout value. */ + lttcomm_inet_init(); + + /* + * Initialize the health check subsystem. This call should set the + * appropriate time values. + */ + health_sessiond = health_app_create(HEALTH_NUM_TYPE); + if (!health_sessiond) { + PERROR("health_app_create error"); + goto exit_health_sessiond_cleanup; + } + /* Create thread to manage the client socket */ ret = pthread_create(&ht_cleanup_thread, NULL, thread_ht_cleanup, (void *) NULL); @@ -4681,7 +4781,7 @@ int main(int argc, char **argv) ust_thread_manage_notify, (void *) NULL); if (ret != 0) { PERROR("pthread_create apps"); - goto exit_apps; + goto exit_apps_notify; } /* Don't start this thread if kernel tracing is not requested nor root */ @@ -4702,12 +4802,20 @@ int main(int argc, char **argv) } exit_kernel: + ret = pthread_join(apps_notify_thread, &status); + if (ret != 0) { + PERROR("pthread_join apps notify"); + goto error; /* join error, exit without cleanup */ + } + +exit_apps_notify: ret = pthread_join(apps_thread, &status); if (ret != 0) { - PERROR("pthread_join"); + PERROR("pthread_join apps"); goto error; /* join error, exit without cleanup */ } + exit_apps: ret = pthread_join(reg_apps_thread, &status); if (ret != 0) { @@ -4761,6 +4869,8 @@ exit_health: goto error; /* join error, exit without cleanup */ } exit_ht_cleanup: + health_app_destroy(health_sessiond); +exit_health_sessiond_cleanup: exit: /* * cleanup() is called when no other thread is running.