#include <grp.h>
#include <limits.h>
#include <pthread.h>
+#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
static int set_permissions(void);
static int setup_lttng_msg(struct command_ctx *cmd_ctx, size_t size);
static int create_lttng_rundir(void);
+static int create_trace_dir(struct ltt_kernel_session *session);
static int set_kconsumerd_sockets(void);
static void cleanup(void);
static void sighandler(int sig);
static void clean_command_ctx(struct command_ctx *cmd_ctx);
+static void teardown_kernel_session(struct ltt_session *session);
static void *thread_manage_clients(void *data);
static void *thread_manage_apps(void *data);
static int client_sock;
static int apps_sock;
static int kconsumerd_err_sock;
+static int kconsumerd_cmd_sock;
static int kernel_tracer_fd;
+static pthread_t kconsumerd_thread;
+static sem_t kconsumerd_sem;
+
+static pthread_mutex_t kconsumerd_pid_mutex;
+static pid_t kconsumerd_pid;
/*
* thread_manage_kconsumerd
static void *thread_manage_kconsumerd(void *data)
{
int sock, ret;
+ enum lttcomm_return_code code;
DBG("[thread] Manage kconsumerd started");
goto error;
}
- while (1) {
- //ret = lttcomm_recv_unix_sock(sock, &lsm, sizeof(lsm));
- if (ret <= 0) {
- /* TODO: Consumerd died? */
- continue;
+ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code));
+ if (ret <= 0) {
+ goto error;
+ }
+
+ if (code == KCONSUMERD_COMMAND_SOCK_READY) {
+ kconsumerd_cmd_sock = lttcomm_connect_unix_sock(kconsumerd_cmd_unix_sock_path);
+ if (kconsumerd_cmd_sock < 0) {
+ perror("kconsumerd connect");
+ goto error;
}
+ /* Signal condition to tell that the kconsumerd is ready */
+ sem_post(&kconsumerd_sem);
+ DBG("Kconsumerd command socket ready");
+ } else {
+ DBG("[sessiond] Kconsumerd error when waiting for SOCK_READY : %s",
+ lttcomm_get_readable_code(-code));
+ goto error;
+ }
+
+ /* Wait for any kconsumerd error */
+ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code));
+ if (ret <= 0) {
+ ERR("[sessiond] Kconsumerd closed the command socket");
+ goto error;
}
+ ERR("Kconsumerd return code : %s", lttcomm_get_readable_code(-code));
+
error:
+ teardown_kernel_session((struct ltt_session *) data);
return NULL;
}
return ret;
}
+/*
+ * start_kconsumerd_thread
+ *
+ * Start the thread_manage_kconsumerd. This must be done after a kconsumerd
+ * exec or it will fails.
+ */
+static int start_kconsumerd_thread(struct ltt_session *session)
+{
+ int ret;
+
+ /* Setup semaphore */
+ sem_init(&kconsumerd_sem, 0, 0);
+
+ ret = pthread_create(&kconsumerd_thread, NULL, thread_manage_kconsumerd, (void *) session);
+ if (ret != 0) {
+ perror("pthread_create kconsumerd");
+ goto error;
+ }
+
+ sem_wait(&kconsumerd_sem);
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * kernel_start_consumer
+ *
+ * Start a kernel consumer daemon (kconsumerd).
+ *
+ * Return pid if successful else -1.
+ */
+pid_t kernel_start_consumer(void)
+{
+ int ret;
+ pid_t pid;
+
+ pid = fork();
+ if (pid == 0) {
+ /*
+ * Exec kconsumerd.
+ */
+ execlp("kconsumerd", "kconsumerd", "--daemonize", NULL);
+ if (errno != 0) {
+ perror("kernel start consumer exec");
+ }
+ exit(EXIT_FAILURE);
+ } else if (pid > 0) {
+ ret = pid;
+ goto error;
+ } else {
+ perror("kernel start consumer fork");
+ ret = -errno;
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * send_kconsumerd_fds
+ *
+ * Send all stream fds of the kernel session to the consumer.
+ */
+static int send_kconsumerd_fds(int sock, struct ltt_kernel_session *session)
+{
+ int ret, i = 0;
+ /* Plus one here for the metadata fd */
+ size_t nb_fd = session->stream_count_global + 1;
+ int fds[nb_fd];
+ struct ltt_kernel_stream *stream;
+ struct ltt_kernel_channel *chan;
+ struct lttcomm_kconsumerd_header lkh;
+ struct lttcomm_kconsumerd_msg buf[nb_fd];
+
+ /* Add metadata data */
+ fds[i] = session->metadata_stream_fd;
+ buf[i].fd = fds[i];
+ buf[i].state = ACTIVE_FD;
+ buf[i].max_sb_size = session->metadata->conf->subbuf_size;
+ strncpy(buf[i].path_name, session->metadata->pathname, PATH_MAX);
+
+ cds_list_for_each_entry(chan, &session->channel_list.head, list) {
+ cds_list_for_each_entry(stream, &chan->stream_list.head, list) {
+ i++;
+ fds[i] = stream->fd;
+ buf[i].fd = stream->fd;
+ buf[i].state = stream->state;
+ buf[i].max_sb_size = chan->channel->subbuf_size;
+ strncpy(buf[i].path_name, stream->pathname, PATH_MAX);
+ }
+ }
+
+ /* Setup header */
+ lkh.payload_size = nb_fd * sizeof(struct lttcomm_kconsumerd_msg);
+ lkh.cmd_type = LTTCOMM_ADD_STREAM;
+
+ DBG("Sending kconsumerd header");
+
+ ret = lttcomm_send_unix_sock(sock, &lkh, sizeof(struct lttcomm_kconsumerd_header));
+ if (ret < 0) {
+ perror("send kconsumerd header");
+ goto error;
+ }
+
+ DBG("Sending all fds to kconsumerd");
+
+ ret = lttcomm_send_fds_unix_sock(sock, buf, fds, nb_fd, lkh.payload_size);
+ if (ret < 0) {
+ perror("send kconsumerd fds");
+ goto error;
+ }
+
+ DBG("Kconsumerd fds sent");
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * free_kernel_session
+ *
+ * Free all data structure inside a kernel session and the session pointer.
+ */
+static void free_kernel_session(struct ltt_kernel_session *session)
+{
+ struct ltt_kernel_channel *chan;
+ struct ltt_kernel_stream *stream;
+ struct ltt_kernel_event *event;
+
+ /* Clean metadata */
+ close(session->metadata_stream_fd);
+ close(session->metadata->fd);
+ free(session->metadata->conf);
+ free(session->metadata);
+
+ cds_list_for_each_entry(chan, &session->channel_list.head, list) {
+ /* Clean all event(s) */
+ cds_list_for_each_entry(event, &chan->events_list.head, list) {
+ close(event->fd);
+ free(event->event);
+ free(event);
+ }
+
+ /* Clean streams */
+ cds_list_for_each_entry(stream, &chan->stream_list.head, list) {
+ close(stream->fd);
+ free(stream->pathname);
+ free(stream);
+ }
+ /* Clean channel */
+ close(chan->fd);
+ free(chan->channel);
+ free(chan->pathname);
+ free(chan);
+ }
+
+ close(session->fd);
+ free(session);
+
+ DBG("All kernel session data structures freed");
+}
+
+/*
+ * teardown_kernel_session
+ *
+ * Complete teardown of a kernel session. This free all data structure
+ * related to a kernel session and update counter.
+ */
+static void teardown_kernel_session(struct ltt_session *session)
+{
+ if (session->kernel_session != NULL) {
+ DBG("Tearing down kernel session");
+ free_kernel_session(session->kernel_session);
+ /* Extra precaution */
+ session->kernel_session = NULL;
+ /* Decrement session count */
+ session->kern_session_count--;
+ /* Set kconsumerd pid to 0 (inactive) */
+ session->kernel_consumer = 0;
+ }
+}
+
/*
* process_client_msg
*
switch (cmd_ctx->lsm->cmd_type) {
case KERNEL_CREATE_SESSION:
case KERNEL_CREATE_CHANNEL:
+ case KERNEL_CREATE_STREAM:
case KERNEL_DISABLE_EVENT:
case KERNEL_ENABLE_EVENT:
case KERNEL_OPEN_METADATA:
goto setup_error;
}
+ DBG("Checking if kconsumerd is alive");
+ pthread_mutex_lock(&kconsumerd_pid_mutex);
+ if (kconsumerd_pid == 0) {
+ ret = kernel_start_consumer();
+ if (ret < 0) {
+ ERR("Kernel start kconsumerd failed");
+ ret = LTTCOMM_KERN_CONSUMER_FAIL;
+ pthread_mutex_unlock(&kconsumerd_pid_mutex);
+ goto error;
+ }
+
+ /* Setting up the global kconsumerd_pid */
+ kconsumerd_pid = ret;
+ }
+ pthread_mutex_unlock(&kconsumerd_pid_mutex);
+
+ ret = start_kconsumerd_thread(cmd_ctx->session);
+ if (ret < 0) {
+ ERR("Fatal error : start_kconsumerd_thread()");
+ ret = LTTCOMM_FATAL;
+ goto error;
+ }
+
DBG("Creating kernel session");
- ret = kernel_create_session(cmd_ctx, kernel_tracer_fd);
+ ret = kernel_create_session(cmd_ctx->session, kernel_tracer_fd);
if (ret < 0) {
ret = LTTCOMM_KERN_SESS_FAIL;
goto error;
goto setup_error;
}
- DBG("Creating kernel session");
+ DBG("Creating kernel channel");
+
+ ret = kernel_create_channel(cmd_ctx->session->kernel_session);
- ret = kernel_create_channel(cmd_ctx);
if (ret < 0) {
ret = LTTCOMM_KERN_CHAN_FAIL;
goto error;
DBG("Enabling kernel event %s", cmd_ctx->lsm->u.event.event_name);
- ret = kernel_enable_event(cmd_ctx->session->kernel_session->channel, cmd_ctx->lsm->u.event.event_name);
+ ret = kernel_enable_event(cmd_ctx->session->kernel_session, cmd_ctx->lsm->u.event.event_name);
if (ret < 0) {
ret = LTTCOMM_KERN_ENABLE_FAIL;
goto error;
ret = LTTCOMM_OK;
break;
}
+ case KERNEL_CREATE_STREAM:
+ {
+ struct ltt_kernel_channel *chan;
+ /* Setup lttng message with no payload */
+ ret = setup_lttng_msg(cmd_ctx, 0);
+ if (ret < 0) {
+ goto setup_error;
+ }
+
+ DBG("Creating kernel stream");
+
+ ret = kernel_create_metadata_stream(cmd_ctx->session->kernel_session);
+ if (ret < 0) {
+ ERR("Kernel create metadata stream failed");
+ ret = LTTCOMM_KERN_STREAM_FAIL;
+ goto error;
+ }
+
+ /* For each channel */
+ cds_list_for_each_entry(chan, &cmd_ctx->session->kernel_session->channel_list.head, list) {
+ ret = kernel_create_channel_stream(chan);
+ if (ret < 0) {
+ ERR("Kernel create channel stream failed");
+ ret = LTTCOMM_KERN_STREAM_FAIL;
+ goto error;
+ }
+ /* Update the stream global counter */
+ cmd_ctx->session->kernel_session->stream_count_global += ret;
+ }
+
+ ret = LTTCOMM_OK;
+ break;
+ }
+ case KERNEL_START_TRACE:
+ {
+ /* Setup lttng message with no payload */
+ ret = setup_lttng_msg(cmd_ctx, 0);
+ if (ret < 0) {
+ goto setup_error;
+ }
+
+ DBG("Start kernel tracing");
+
+ ret = create_trace_dir(cmd_ctx->session->kernel_session);
+ if (ret < 0) {
+ if (ret == -EEXIST) {
+ ret = LTTCOMM_KERN_DIR_EXIST;
+ } else {
+ ret = LTTCOMM_KERN_DIR_FAIL;
+ goto error;
+ }
+ }
+
+ ret = kernel_start_session(cmd_ctx->session->kernel_session);
+ if (ret < 0) {
+ ERR("Kernel start session failed");
+ ret = LTTCOMM_KERN_START_FAIL;
+ goto error;
+ }
+
+ ret = send_kconsumerd_fds(kconsumerd_cmd_sock, cmd_ctx->session->kernel_session);
+ if (ret < 0) {
+ ERR("Send kconsumerd fds failed");
+ ret = LTTCOMM_KERN_CONSUMER_FAIL;
+ goto error;
+ }
+
+ ret = LTTCOMM_OK;
+ break;
+ }
+ case KERNEL_STOP_TRACE:
+ {
+ /* Setup lttng message with no payload */
+ ret = setup_lttng_msg(cmd_ctx, 0);
+ if (ret < 0) {
+ goto setup_error;
+ }
+
+ DBG("Stop kernel tracing");
+
+ ret = kernel_stop_session(cmd_ctx->session->kernel_session);
+ if (ret < 0) {
+ ERR("Kernel stop session failed");
+ ret = LTTCOMM_KERN_STOP_FAIL;
+ goto error;
+ }
+
+ /* Clean kernel session teardown */
+ teardown_kernel_session(cmd_ctx->session);
+
+ ret = LTTCOMM_OK;
+ break;
+ }
case LTTNG_CREATE_SESSION:
{
/* Setup lttng message with no payload */
return ret;
}
+/*
+ * create_trace_dir
+ *
+ * Create the trace output directory.
+ */
+static int create_trace_dir(struct ltt_kernel_session *session)
+{
+ int ret;
+ struct ltt_kernel_channel *chan;
+
+ /* Create all channel directories */
+ cds_list_for_each_entry(chan, &session->channel_list.head, list) {
+ ret = mkdir(chan->pathname, S_IRWXU | S_IRWXG );
+ if (ret < 0) {
+ perror("mkdir trace path");
+ ret = -errno;
+ goto error;
+ }
+ }
+
+ return 0;
+
+error:
+ return ret;
+}
+
/*
* create_lttng_rundir
*
WARN("No kernel tracer available");
kernel_tracer_fd = 0;
}
+
+ DBG("Kernel tracer fd %d", kernel_tracer_fd);
}
/*
{
int ret;
char *cmd;
+ struct ltt_session *sess;
+ struct ltt_session_list *session_list = get_session_list();
DBG("Cleaning up");
ERR("Unable to clean " LTTNG_RUNDIR);
}
- // TODO Clean kernel trace fds
+ /* Cleanup ALL session */
+ cds_list_for_each_entry(sess, &session_list->head, list) {
+ teardown_kernel_session(sess);
+ // TODO complete session cleanup (including UST)
+ }
+
close(kernel_tracer_fd);
}