#include "fd-limit.h"
#include "health-sessiond.h"
#include "testpoint.h"
-#include "ust-thread.h"
+#include "notify-apps.h"
#include "agent-thread.h"
#include "save.h"
-#include "load-session-thread.h"
#include "notification-thread.h"
#include "notification-thread-commands.h"
#include "rotation-thread.h"
#include "client.h"
#include "dispatch.h"
#include "register.h"
+#include "manage-apps.h"
+#include "manage-kernel.h"
static const char *help_msg =
#ifdef LTTNG_EMBED_HELP
* that a command is queued and ready to be processed.
*/
static int apps_cmd_pipe[2] = { -1, -1 };
-
-/* Pthread, Mutexes and Semaphores */
-static pthread_t apps_thread;
-static pthread_t apps_notify_thread;
-static pthread_t kernel_thread;
-static pthread_t agent_reg_thread;
-static pthread_t load_session_thread;
+static int apps_cmd_notify_pipe[2] = { -1, -1 };
/*
* UST registration command queue. This queue is tied with a futex and uses a N
static const char *module_proc_lttng = "/proc/lttng";
-/* Load session thread information to operate. */
-static struct load_session_thread_data *load_info;
-
/*
* Section name to look for in the daemon configuration file.
*/
* since we are now called.
*/
sessiond_close_quit_pipe();
+ utils_close_pipe(apps_cmd_pipe);
+ utils_close_pipe(apps_cmd_notify_pipe);
+ utils_close_pipe(kernel_poll_pipe);
ret = remove(config.pid_file_path.value);
if (ret < 0) {
close_consumer_sockets();
- if (load_info) {
- load_session_destroy_data(load_info);
- free(load_info);
- }
-
/*
* We do NOT rmdir rundir because there are other processes
* using it, for instance lttng-relayd, which can start in
run_as_destroy_worker();
}
-/*
- * Update the kernel poll set of all channel fd available over all tracing
- * session. Add the wakeup pipe at the end of the set.
- */
-static int update_kernel_poll(struct lttng_poll_event *events)
-{
- int ret;
- struct ltt_kernel_channel *channel;
- struct ltt_session *session;
- const struct ltt_session_list *session_list = session_get_list();
-
- DBG("Updating kernel poll set");
-
- session_lock_list();
- cds_list_for_each_entry(session, &session_list->head, list) {
- if (!session_get(session)) {
- continue;
- }
- session_lock(session);
- if (session->kernel_session == NULL) {
- session_unlock(session);
- session_put(session);
- continue;
- }
-
- cds_list_for_each_entry(channel,
- &session->kernel_session->channel_list.head, list) {
- /* Add channel fd to the kernel poll set */
- ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM);
- if (ret < 0) {
- session_unlock(session);
- session_put(session);
- goto error;
- }
- DBG("Channel fd %d added to kernel set", channel->fd);
- }
- session_unlock(session);
- }
- session_unlock_list();
-
- return 0;
-
-error:
- session_unlock_list();
- return -1;
-}
-
-/*
- * Find the channel fd from 'fd' over all tracing session. When found, check
- * for new channel stream and send those stream fds to the kernel consumer.
- *
- * Useful for CPU hotplug feature.
- */
-static int update_kernel_stream(int fd)
-{
- int ret = 0;
- struct ltt_session *session;
- struct ltt_kernel_session *ksess;
- struct ltt_kernel_channel *channel;
- const struct ltt_session_list *session_list = session_get_list();
-
- DBG("Updating kernel streams for channel fd %d", fd);
-
- session_lock_list();
- cds_list_for_each_entry(session, &session_list->head, list) {
- if (!session_get(session)) {
- continue;
- }
- session_lock(session);
- if (session->kernel_session == NULL) {
- session_unlock(session);
- session_put(session);
- continue;
- }
- ksess = session->kernel_session;
-
- cds_list_for_each_entry(channel,
- &ksess->channel_list.head, list) {
- struct lttng_ht_iter iter;
- struct consumer_socket *socket;
-
- if (channel->fd != fd) {
- continue;
- }
- DBG("Channel found, updating kernel streams");
- ret = kernel_open_channel_stream(channel);
- if (ret < 0) {
- goto error;
- }
- /* Update the stream global counter */
- ksess->stream_count_global += ret;
-
- /*
- * Have we already sent fds to the consumer? If yes, it
- * means that tracing is started so it is safe to send
- * our updated stream fds.
- */
- if (ksess->consumer_fds_sent != 1
- || ksess->consumer == NULL) {
- ret = -1;
- goto error;
- }
-
- rcu_read_lock();
- cds_lfht_for_each_entry(ksess->consumer->socks->ht,
- &iter.iter, socket, node.node) {
- pthread_mutex_lock(socket->lock);
- ret = kernel_consumer_send_channel_streams(socket,
- channel, ksess,
- session->output_traces ? 1 : 0);
- pthread_mutex_unlock(socket->lock);
- if (ret < 0) {
- rcu_read_unlock();
- goto error;
- }
- }
- rcu_read_unlock();
- }
- session_unlock(session);
- session_put(session);
- }
- session_unlock_list();
- return ret;
-
-error:
- session_unlock(session);
- session_put(session);
- session_unlock_list();
- return ret;
-}
-
-/*
- * This thread manage event coming from the kernel.
- *
- * Features supported in this thread:
- * -) CPU Hotplug
- */
-static void *thread_manage_kernel(void *data)
-{
- int ret, i, pollfd, update_poll_flag = 1, err = -1;
- uint32_t revents, nb_fd;
- char tmp;
- struct lttng_poll_event events;
-
- DBG("[thread] Thread manage kernel started");
-
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_KERNEL);
-
- /*
- * This first step of the while is to clean this structure which could free
- * non NULL pointers so initialize it before the loop.
- */
- lttng_poll_init(&events);
-
- if (testpoint(sessiond_thread_manage_kernel)) {
- goto error_testpoint;
- }
-
- health_code_update();
-
- if (testpoint(sessiond_thread_manage_kernel_before_loop)) {
- goto error_testpoint;
- }
-
- while (1) {
- health_code_update();
-
- if (update_poll_flag == 1) {
- /* Clean events object. We are about to populate it again. */
- lttng_poll_clean(&events);
-
- ret = sessiond_set_thread_pollset(&events, 2);
- if (ret < 0) {
- goto error_poll_create;
- }
-
- ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN);
- if (ret < 0) {
- goto error;
- }
-
- /* This will add the available kernel channel if any. */
- ret = update_kernel_poll(&events);
- if (ret < 0) {
- goto error;
- }
- update_poll_flag = 0;
- }
-
- DBG("Thread kernel polling");
-
- /* Poll infinite value of time */
- restart:
- health_poll_entry();
- ret = lttng_poll_wait(&events, -1);
- DBG("Thread kernel return from poll on %d fds",
- LTTNG_POLL_GETNB(&events));
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart;
- }
- goto error;
- } else if (ret == 0) {
- /* Should not happen since timeout is infinite */
- ERR("Return value of poll is 0 with an infinite timeout.\n"
- "This should not have happened! Continuing...");
- continue;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /* Thread quit pipe has been closed. Killing thread. */
- ret = sessiond_check_thread_quit_pipe(pollfd, revents);
- if (ret) {
- err = 0;
- goto exit;
- }
-
- /* Check for data on kernel pipe */
- if (revents & LPOLLIN) {
- if (pollfd == kernel_poll_pipe[0]) {
- (void) lttng_read(kernel_poll_pipe[0],
- &tmp, 1);
- /*
- * Ret value is useless here, if this pipe gets any actions an
- * update is required anyway.
- */
- update_poll_flag = 1;
- continue;
- } else {
- /*
- * New CPU detected by the kernel. Adding kernel stream to
- * kernel session and updating the kernel consumer
- */
- ret = update_kernel_stream(pollfd);
- if (ret < 0) {
- continue;
- }
- break;
- }
- } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- update_poll_flag = 1;
- continue;
- } else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- }
- }
-
-exit:
-error:
- lttng_poll_clean(&events);
-error_poll_create:
-error_testpoint:
- utils_close_pipe(kernel_poll_pipe);
- kernel_poll_pipe[0] = kernel_poll_pipe[1] = -1;
- if (err) {
- health_error();
- ERR("Health error occurred in %s", __func__);
- WARN("Kernel thread died unexpectedly. "
- "Kernel tracing can continue but CPU hotplug is disabled.");
- }
- health_unregister(health_sessiond);
- DBG("Kernel thread dying");
- return NULL;
-}
-
-/*
- * Signal pthread condition of the consumer data that the thread.
- */
-static void signal_consumer_condition(struct consumer_data *data, int state)
-{
- pthread_mutex_lock(&data->cond_mutex);
-
- /*
- * The state is set before signaling. It can be any value, it's the waiter
- * job to correctly interpret this condition variable associated to the
- * consumer pthread_cond.
- *
- * A value of 0 means that the corresponding thread of the consumer data
- * was not started. 1 indicates that the thread has started and is ready
- * for action. A negative value means that there was an error during the
- * thread bootstrap.
- */
- data->consumer_thread_is_ready = state;
- (void) pthread_cond_signal(&data->cond);
-
- pthread_mutex_unlock(&data->cond_mutex);
-}
-
-/*
- * This thread manage the consumer error sent back to the session daemon.
- */
-void *thread_manage_consumer(void *data)
-{
- int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
- uint32_t revents, nb_fd;
- enum lttcomm_return_code code;
- struct lttng_poll_event events;
- struct consumer_data *consumer_data = data;
- struct consumer_socket *cmd_socket_wrapper = NULL;
-
- DBG("[thread] Manage consumer started");
-
- rcu_register_thread();
- rcu_thread_online();
-
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
-
- health_code_update();
-
- /*
- * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
- * metadata_sock. Nothing more will be added to this poll set.
- */
- ret = sessiond_set_thread_pollset(&events, 3);
- if (ret < 0) {
- goto error_poll;
- }
-
- /*
- * The error socket here is already in a listening state which was done
- * just before spawning this thread to avoid a race between the consumer
- * daemon exec trying to connect and the listen() call.
- */
- ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- health_code_update();
-
- /* Infinite blocking call, waiting for transmission */
-restart:
- health_poll_entry();
-
- if (testpoint(sessiond_thread_manage_consumer)) {
- goto error;
- }
-
- ret = lttng_poll_wait(&events, -1);
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart;
- }
- goto error;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /* Thread quit pipe has been closed. Killing thread. */
- ret = sessiond_check_thread_quit_pipe(pollfd, revents);
- if (ret) {
- err = 0;
- goto exit;
- }
-
- /* Event on the registration socket */
- if (pollfd == consumer_data->err_sock) {
- if (revents & LPOLLIN) {
- continue;
- } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("consumer err socket poll error");
- goto error;
- } else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- }
- }
-
- sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
- if (sock < 0) {
- goto error;
- }
-
- /*
- * Set the CLOEXEC flag. Return code is useless because either way, the
- * show must go on.
- */
- (void) utils_set_fd_cloexec(sock);
-
- health_code_update();
-
- DBG2("Receiving code from consumer err_sock");
-
- /* Getting status code from kconsumerd */
- ret = lttcomm_recv_unix_sock(sock, &code,
- sizeof(enum lttcomm_return_code));
- if (ret <= 0) {
- goto error;
- }
-
- health_code_update();
- if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
- ERR("consumer error when waiting for SOCK_READY : %s",
- lttcomm_get_readable_code(-code));
- goto error;
- }
-
- /* Connect both command and metadata sockets. */
- consumer_data->cmd_sock =
- lttcomm_connect_unix_sock(
- consumer_data->cmd_unix_sock_path);
- consumer_data->metadata_fd =
- lttcomm_connect_unix_sock(
- consumer_data->cmd_unix_sock_path);
- if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
- PERROR("consumer connect cmd socket");
- /* On error, signal condition and quit. */
- signal_consumer_condition(consumer_data, -1);
- goto error;
- }
-
- consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
-
- /* Create metadata socket lock. */
- consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
- if (consumer_data->metadata_sock.lock == NULL) {
- PERROR("zmalloc pthread mutex");
- goto error;
- }
- pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
-
- DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
- DBG("Consumer metadata socket ready (fd: %d)",
- consumer_data->metadata_fd);
-
- /*
- * Remove the consumerd error sock since we've established a connection.
- */
- ret = lttng_poll_del(&events, consumer_data->err_sock);
- if (ret < 0) {
- goto error;
- }
-
- /* Add new accepted error socket. */
- ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- /* Add metadata socket that is successfully connected. */
- ret = lttng_poll_add(&events, consumer_data->metadata_fd,
- LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- health_code_update();
-
- /*
- * Transfer the write-end of the channel monitoring and rotate pipe
- * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
- */
- cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
- if (!cmd_socket_wrapper) {
- goto error;
- }
- cmd_socket_wrapper->lock = &consumer_data->lock;
-
- ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
- consumer_data->channel_monitor_pipe);
- if (ret) {
- goto error;
- }
-
- /* Discard the socket wrapper as it is no longer needed. */
- consumer_destroy_socket(cmd_socket_wrapper);
- cmd_socket_wrapper = NULL;
-
- /* The thread is completely initialized, signal that it is ready. */
- signal_consumer_condition(consumer_data, 1);
-
- /* Infinite blocking call, waiting for transmission */
-restart_poll:
- while (1) {
- health_code_update();
-
- /* Exit the thread because the thread quit pipe has been triggered. */
- if (should_quit) {
- /* Not a health error. */
- err = 0;
- goto exit;
- }
-
- health_poll_entry();
- ret = lttng_poll_wait(&events, -1);
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart_poll;
- }
- goto error;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /*
- * Thread quit pipe has been triggered, flag that we should stop
- * but continue the current loop to handle potential data from
- * consumer.
- */
- should_quit = sessiond_check_thread_quit_pipe(pollfd, revents);
-
- if (pollfd == sock) {
- /* Event on the consumerd socket */
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
- && !(revents & LPOLLIN)) {
- ERR("consumer err socket second poll error");
- goto error;
- }
- health_code_update();
- /* Wait for any kconsumerd error */
- ret = lttcomm_recv_unix_sock(sock, &code,
- sizeof(enum lttcomm_return_code));
- if (ret <= 0) {
- ERR("consumer closed the command socket");
- goto error;
- }
-
- ERR("consumer return code : %s",
- lttcomm_get_readable_code(-code));
-
- goto exit;
- } else if (pollfd == consumer_data->metadata_fd) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
- && !(revents & LPOLLIN)) {
- ERR("consumer err metadata socket second poll error");
- goto error;
- }
- /* UST metadata requests */
- ret = ust_consumer_metadata_request(
- &consumer_data->metadata_sock);
- if (ret < 0) {
- ERR("Handling metadata request");
- goto error;
- }
- }
- /* No need for an else branch all FDs are tested prior. */
- }
- health_code_update();
- }
-
-exit:
-error:
- /*
- * We lock here because we are about to close the sockets and some other
- * thread might be using them so get exclusive access which will abort all
- * other consumer command by other threads.
- */
- pthread_mutex_lock(&consumer_data->lock);
-
- /* Immediately set the consumerd state to stopped */
- if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
- uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
- } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
- consumer_data->type == LTTNG_CONSUMER32_UST) {
- uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
- } else {
- /* Code flow error... */
- assert(0);
- }
-
- if (consumer_data->err_sock >= 0) {
- ret = close(consumer_data->err_sock);
- if (ret) {
- PERROR("close");
- }
- consumer_data->err_sock = -1;
- }
- if (consumer_data->cmd_sock >= 0) {
- ret = close(consumer_data->cmd_sock);
- if (ret) {
- PERROR("close");
- }
- consumer_data->cmd_sock = -1;
- }
- if (consumer_data->metadata_sock.fd_ptr &&
- *consumer_data->metadata_sock.fd_ptr >= 0) {
- ret = close(*consumer_data->metadata_sock.fd_ptr);
- if (ret) {
- PERROR("close");
- }
- }
- if (sock >= 0) {
- ret = close(sock);
- if (ret) {
- PERROR("close");
- }
- }
-
- unlink(consumer_data->err_unix_sock_path);
- unlink(consumer_data->cmd_unix_sock_path);
- pthread_mutex_unlock(&consumer_data->lock);
-
- /* Cleanup metadata socket mutex. */
- if (consumer_data->metadata_sock.lock) {
- pthread_mutex_destroy(consumer_data->metadata_sock.lock);
- free(consumer_data->metadata_sock.lock);
- }
- lttng_poll_clean(&events);
-
- if (cmd_socket_wrapper) {
- consumer_destroy_socket(cmd_socket_wrapper);
- }
-error_poll:
- if (err) {
- health_error();
- ERR("Health error occurred in %s", __func__);
- }
- health_unregister(health_sessiond);
- DBG("consumer thread cleanup completed");
-
- rcu_thread_offline();
- rcu_unregister_thread();
-
- return NULL;
-}
-
-/*
- * This thread receives application command sockets (FDs) on the
- * apps_cmd_pipe and waits (polls) on them until they are closed
- * or an error occurs.
- *
- * At that point, it flushes the data (tracing and metadata) associated
- * with this application and tears down ust app sessions and other
- * associated data structures through ust_app_unregister().
- *
- * Note that this thread never sends commands to the applications
- * through the command sockets; it merely listens for hang-ups
- * and errors on those sockets and cleans-up as they occur.
- */
-static void *thread_manage_apps(void *data)
-{
- int i, ret, pollfd, err = -1;
- ssize_t size_ret;
- uint32_t revents, nb_fd;
- struct lttng_poll_event events;
-
- DBG("[thread] Manage application started");
-
- rcu_register_thread();
- rcu_thread_online();
-
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_MANAGE);
-
- if (testpoint(sessiond_thread_manage_apps)) {
- goto error_testpoint;
- }
-
- health_code_update();
-
- ret = sessiond_set_thread_pollset(&events, 2);
- if (ret < 0) {
- goto error_poll_create;
- }
-
- ret = lttng_poll_add(&events, apps_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- if (testpoint(sessiond_thread_manage_apps_before_loop)) {
- goto error;
- }
-
- health_code_update();
-
- while (1) {
- DBG("Apps thread polling");
-
- /* Inifinite blocking call, waiting for transmission */
- restart:
- health_poll_entry();
- ret = lttng_poll_wait(&events, -1);
- DBG("Apps thread return from poll on %d fds",
- LTTNG_POLL_GETNB(&events));
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart;
- }
- goto error;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /* Thread quit pipe has been closed. Killing thread. */
- ret = sessiond_check_thread_quit_pipe(pollfd, revents);
- if (ret) {
- err = 0;
- goto exit;
- }
-
- /* Inspect the apps cmd pipe */
- if (pollfd == apps_cmd_pipe[0]) {
- if (revents & LPOLLIN) {
- int sock;
-
- /* Empty pipe */
- size_ret = lttng_read(apps_cmd_pipe[0], &sock, sizeof(sock));
- if (size_ret < sizeof(sock)) {
- PERROR("read apps cmd pipe");
- goto error;
- }
-
- health_code_update();
-
- /*
- * Since this is a command socket (write then read),
- * we only monitor the error events of the socket.
- */
- ret = lttng_poll_add(&events, sock,
- LPOLLERR | LPOLLHUP | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- DBG("Apps with sock %d added to poll set", sock);
- } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Apps command pipe error");
- goto error;
- } else {
- ERR("Unknown poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- } else {
- /*
- * At this point, we know that a registered application made
- * the event at poll_wait.
- */
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- /* Removing from the poll set */
- ret = lttng_poll_del(&events, pollfd);
- if (ret < 0) {
- goto error;
- }
-
- /* Socket closed on remote end. */
- ust_app_unregister(pollfd);
- } else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- }
-
- health_code_update();
- }
- }
-
-exit:
-error:
- lttng_poll_clean(&events);
-error_poll_create:
-error_testpoint:
- utils_close_pipe(apps_cmd_pipe);
- apps_cmd_pipe[0] = apps_cmd_pipe[1] = -1;
-
- /*
- * We don't clean the UST app hash table here since already registered
- * applications can still be controlled so let them be until the session
- * daemon dies or the applications stop.
- */
-
- if (err) {
- health_error();
- ERR("Health error occurred in %s", __func__);
- }
- health_unregister(health_sessiond);
- DBG("Application communication apps thread cleanup complete");
- rcu_thread_offline();
- rcu_unregister_thread();
- return NULL;
-}
-
/*
* Setup necessary data for kernel tracer action.
*/
if (session->destroyed) {
goto unlock_session;
}
+ (void) cmd_stop_trace(session);
(void) cmd_destroy_session(session,
notification_thread_handle);
unlock_session:
int main(int argc, char **argv)
{
int ret = 0, retval = 0;
- void *status;
const char *env_app_timeout;
struct lttng_pipe *ust32_channel_monitor_pipe = NULL,
*ust64_channel_monitor_pipe = NULL,
/* Queue of rotation jobs populated by the sessiond-timer. */
struct rotation_thread_timer_queue *rotation_timer_queue = NULL;
struct lttng_thread *client_thread = NULL;
+ struct lttng_thread *notification_thread = NULL;
+ struct lttng_thread *register_apps_thread = NULL;
init_kernel_workarounds();
if (!health_sessiond) {
PERROR("health_app_create error");
retval = -1;
- goto exit_health_sessiond_cleanup;
+ goto stop_threads;
}
/* Create thread to clean up RCU hash tables */
ht_cleanup_thread = launch_ht_cleanup_thread();
if (!ht_cleanup_thread) {
retval = -1;
- goto exit_ht_cleanup;
+ goto stop_threads;
}
/* Create thread quit pipe */
if (sessiond_init_thread_quit_pipe()) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/* Check if daemon is UID = 0 */
if (!kernel_channel_monitor_pipe) {
ERR("Failed to create kernel consumer channel monitor pipe");
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
kconsumer_data.channel_monitor_pipe =
lttng_pipe_release_writefd(
kernel_channel_monitor_pipe);
if (kconsumer_data.channel_monitor_pipe < 0) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
}
if (!ust32_channel_monitor_pipe) {
ERR("Failed to create 32-bit user space consumer channel monitor pipe");
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
ustconsumer32_data.channel_monitor_pipe = lttng_pipe_release_writefd(
ust32_channel_monitor_pipe);
if (ustconsumer32_data.channel_monitor_pipe < 0) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/*
rotation_timer_queue = rotation_thread_timer_queue_create();
if (!rotation_timer_queue) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
timer_thread_parameters.rotation_thread_job_queue =
rotation_timer_queue;
if (!ust64_channel_monitor_pipe) {
ERR("Failed to create 64-bit user space consumer channel monitor pipe");
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
ustconsumer64_data.channel_monitor_pipe = lttng_pipe_release_writefd(
ust64_channel_monitor_pipe);
if (ustconsumer64_data.channel_monitor_pipe < 0) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/*
if (ust_app_ht_alloc()) {
ERR("Failed to allocate UST app hash table");
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/*
if (agent_app_ht_alloc()) {
ERR("Failed to allocate Agent app hash table");
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/*
if (is_root) {
if (set_consumer_sockets(&kconsumer_data)) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/* Setup kernel tracer */
if (set_consumer_sockets(&ustconsumer64_data)) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
if (set_consumer_sockets(&ustconsumer32_data)) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/* Set credentials to socket */
if (is_root && set_permissions(config.rundir.value)) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/* Get parent pid if -S, --sig-parent is specified. */
if (is_root && !config.no_kernel) {
if (utils_create_pipe_cloexec(kernel_poll_pipe)) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
}
/* Setup the thread apps communication pipe. */
if (utils_create_pipe_cloexec(apps_cmd_pipe)) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/* Setup the thread apps notify communication pipe. */
if (utils_create_pipe_cloexec(apps_cmd_notify_pipe)) {
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/* Initialize global buffer per UID and PID registry. */
if (ret) {
ERR("Error in write_pidfile");
retval = -1;
- goto exit_init_data;
+ goto stop_threads;
}
/* Initialize communication library */
/* Initialize TCP timeout values */
lttcomm_inet_init();
- if (load_session_init_data(&load_info) < 0) {
- retval = -1;
- goto exit_init_data;
- }
- load_info->path = config.load_session_path.value;
-
/* Create health-check thread. */
if (!launch_health_management_thread()) {
retval = -1;
- goto exit_health;
+ goto stop_threads;
}
/* notification_thread_data acquires the pipes' read side. */
if (!notification_thread_handle) {
retval = -1;
ERR("Failed to create notification thread shared data");
- goto exit_notification;
+ goto stop_threads;
}
/* Create notification thread. */
- if (!launch_notification_thread(notification_thread_handle)) {
+ notification_thread = launch_notification_thread(
+ notification_thread_handle);
+ if (!notification_thread) {
retval = -1;
- goto exit_notification;
+ goto stop_threads;
}
/* Create timer thread. */
if (!launch_timer_thread(&timer_thread_parameters)) {
retval = -1;
- goto exit_notification;
+ goto stop_threads;
}
/* rotation_thread_data acquires the pipes' read side. */
retval = -1;
ERR("Failed to create rotation thread shared data");
stop_threads();
- goto exit_rotation;
+ goto stop_threads;
}
/* Create rotation thread. */
if (!launch_rotation_thread(rotation_thread_handle)) {
retval = -1;
- goto exit_rotation;
+ goto stop_threads;
}
/* Create thread to manage the client socket */
client_thread = launch_client_thread();
if (!client_thread) {
retval = -1;
- goto exit_client;
+ goto stop_threads;
}
if (!launch_ust_dispatch_thread(&ust_cmd_queue, apps_cmd_pipe[1],
apps_cmd_notify_pipe[1])) {
retval = -1;
- goto exit_dispatch;
+ goto stop_threads;
}
/* Create thread to manage application registration. */
- if (!launch_application_registration_thread(&ust_cmd_queue)) {
+ register_apps_thread = launch_application_registration_thread(
+ &ust_cmd_queue);
+ if (!register_apps_thread) {
retval = -1;
- goto exit_reg_apps;
+ goto stop_threads;
}
/* Create thread to manage application socket */
- ret = pthread_create(&apps_thread, default_pthread_attr(),
- thread_manage_apps, (void *) NULL);
- if (ret) {
- errno = ret;
- PERROR("pthread_create apps");
+ if (!launch_application_management_thread(apps_cmd_pipe[0])) {
retval = -1;
- stop_threads();
- goto exit_apps;
+ goto stop_threads;
}
/* Create thread to manage application notify socket */
- ret = pthread_create(&apps_notify_thread, default_pthread_attr(),
- ust_thread_manage_notify, (void *) NULL);
- if (ret) {
- errno = ret;
- PERROR("pthread_create notify");
+ if (!launch_application_notification_thread(apps_cmd_notify_pipe[0])) {
retval = -1;
- stop_threads();
- goto exit_apps_notify;
+ goto stop_threads;
}
- /* Create agent registration thread. */
- ret = pthread_create(&agent_reg_thread, default_pthread_attr(),
- agent_thread_manage_registration, (void *) NULL);
- if (ret) {
- errno = ret;
- PERROR("pthread_create agent");
+ /* Create agent management thread. */
+ if (!launch_agent_management_thread()) {
retval = -1;
- stop_threads();
- goto exit_agent_reg;
+ goto stop_threads;
}
/* Don't start this thread if kernel tracing is not requested nor root */
if (is_root && !config.no_kernel) {
/* Create kernel thread to manage kernel event */
- ret = pthread_create(&kernel_thread, default_pthread_attr(),
- thread_manage_kernel, (void *) NULL);
- if (ret) {
- errno = ret;
- PERROR("pthread_create kernel");
+ if (!launch_kernel_management_thread(kernel_poll_pipe[0])) {
retval = -1;
- stop_threads();
- goto exit_kernel;
+ goto stop_threads;
}
}
- /* Create session loading thread. */
- ret = pthread_create(&load_session_thread, default_pthread_attr(),
- thread_load_session, load_info);
+ /* Load sessions. */
+ ret = config_load_session(config.load_session_path.value,
+ NULL, 1, 1, NULL);
if (ret) {
- errno = ret;
- PERROR("pthread_create load_session_thread");
+ ERR("Session load failed: %s", error_get_str(ret));
retval = -1;
- stop_threads();
- goto exit_load_session;
+ goto stop_threads;
}
+ /* Initialization completed. */
+ sessiond_signal_parents();
+
/*
* This is where we start awaiting program completion (e.g. through
* signal that asks threads to teardown).
*/
- ret = pthread_join(load_session_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join load_session_thread");
- retval = -1;
- }
-
/* Initiate teardown once activity occurs on the quit pipe. */
sessiond_wait_for_quit_pipe(-1U);
+stop_threads:
/*
* Ensure that the client thread is no longer accepting new commands,
* which could cause new sessions to be created.
*/
- if (!lttng_thread_shutdown(client_thread)) {
- ERR("Failed to shutdown the client thread, continuing teardown");
+ if (client_thread) {
+ lttng_thread_shutdown(client_thread);
lttng_thread_put(client_thread);
- client_thread = NULL;
}
destroy_all_sessions_and_wait();
-exit_load_session:
-
- if (is_root && !config.no_kernel) {
- ret = pthread_join(kernel_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join");
- retval = -1;
- }
- }
-exit_kernel:
- ret = pthread_join(agent_reg_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join agent");
- retval = -1;
+ if (register_apps_thread) {
+ lttng_thread_shutdown(register_apps_thread);
+ lttng_thread_put(register_apps_thread);
}
-exit_agent_reg:
-
- ret = pthread_join(apps_notify_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join apps notify");
- retval = -1;
- }
-exit_apps_notify:
-
- ret = pthread_join(apps_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join apps");
- retval = -1;
- }
-exit_apps:
-exit_reg_apps:
-exit_dispatch:
-exit_client:
-exit_rotation:
-exit_notification:
lttng_thread_list_shutdown_orphans();
-exit_health:
-exit_init_data:
- if (client_thread) {
- lttng_thread_put(client_thread);
- }
/*
* Wait for all pending call_rcu work to complete before tearing
rcu_thread_online();
sessiond_cleanup();
+ if (notification_thread) {
+ lttng_thread_shutdown(notification_thread);
+ lttng_thread_put(notification_thread);
+ }
+
/*
* Ensure all prior call_rcu are done. call_rcu callbacks may push
* hash tables to the ht_cleanup thread. Therefore, we ensure that
lttng_pipe_destroy(ust32_channel_monitor_pipe);
lttng_pipe_destroy(ust64_channel_monitor_pipe);
lttng_pipe_destroy(kernel_channel_monitor_pipe);
-exit_ht_cleanup:
health_app_destroy(health_sessiond);
-exit_health_sessiond_cleanup:
exit_create_run_as_worker_cleanup:
-
exit_options:
sessiond_cleanup_lock_file();
sessiond_cleanup_options();