summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
eb9cb8b)
Remove the pthread_cancel and introduce the *thread_quit_pipe* that is
used to tell a thread that it should exit cleanly. This pipe is added to
the pollfd set of the thread and has to be close in order to trigger the
clean exit of the threads.
Signed-off-by: David Goulet <david.goulet@polymtl.ca>
static int kernel_tracer_fd;
static int kernel_poll_pipe[2];
static int kernel_tracer_fd;
static int kernel_poll_pipe[2];
+/*
+ * Quit pipe for all threads. This permits a single cancellation point
+ * for all threads when receiving an event on the pipe.
+ */
+static int thread_quit_pipe[2];
+
/* Pthread, Mutexes and Semaphores */
static pthread_t kconsumerd_thread;
static pthread_t apps_thread;
/* Pthread, Mutexes and Semaphores */
static pthread_t kconsumerd_thread;
static pthread_t apps_thread;
*/
static struct ltt_session_list *session_list_ptr;
*/
static struct ltt_session_list *session_list_ptr;
+/*
+ * Init quit pipe.
+ *
+ * Return -1 on error or 0 if all pipes are created.
+ */
+static int init_thread_quit_pipe(void)
+{
+ int ret;
+
+ ret = pipe2(thread_quit_pipe, O_CLOEXEC);
+ if (ret < 0) {
+ perror("thread quit pipe");
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
/*
* teardown_kernel_session
*
/*
* teardown_kernel_session
*
- * cleanup
- *
- * Cleanup the daemon on exit
*/
static void cleanup()
{
*/
static void cleanup()
{
DBG("Cleaning up");
/* <fun> */
DBG("Cleaning up");
/* <fun> */
- MSG("\n%c[%d;%dm*** assert failed *** ==> %c[%dm", 27,1,31,27,0);
- MSG("%c[%d;%dmMatthew, BEET driven development works!%c[%dm",27,1,33,27,0);
+ MSG("\n%c[%d;%dm*** assert failed *** ==> %c[%dm%c[%d;%dm"
+ "Matthew, BEET driven development works!%c[%dm",
+ 27, 1, 31, 27, 0, 27, 1, 33, 27, 0);
/* </fun> */
/* Stopping all threads */
DBG("Terminating all threads");
/* </fun> */
/* Stopping all threads */
DBG("Terminating all threads");
- pthread_cancel(client_thread);
- pthread_cancel(apps_thread);
- pthread_cancel(kernel_thread);
- if (kconsumerd_pid != 0) {
- pthread_cancel(kconsumerd_thread);
- }
-
- DBG("Unlinking all unix socket");
- unlink(client_unix_sock_path);
- unlink(apps_unix_sock_path);
- unlink(kconsumerd_err_unix_sock_path);
+ close(thread_quit_pipe[0]);
+ close(thread_quit_pipe[1]);
DBG("Removing %s directory", LTTNG_RUNDIR);
ret = asprintf(&cmd, "rm -rf " LTTNG_RUNDIR);
DBG("Removing %s directory", LTTNG_RUNDIR);
ret = asprintf(&cmd, "rm -rf " LTTNG_RUNDIR);
}
DBG("Cleaning up all session");
}
DBG("Cleaning up all session");
- /* Cleanup ALL session */
- cds_list_for_each_entry(sess, &session_list_ptr->head, list) {
- teardown_kernel_session(sess);
- // TODO complete session cleanup (including UST)
- }
/* Destroy session list mutex */
/* Destroy session list mutex */
- pthread_mutex_destroy(&session_list_ptr->lock);
+ if (session_list_ptr != NULL) {
+ pthread_mutex_destroy(&session_list_ptr->lock);
+
+ /* Cleanup ALL session */
+ cds_list_for_each_entry(sess, &session_list_ptr->head, list) {
+ teardown_kernel_session(sess);
+ // TODO complete session cleanup (including UST)
+ }
+ }
+
+ pthread_mutex_destroy(&kconsumerd_pid_mutex);
DBG("Closing kernel fd");
close(kernel_tracer_fd);
DBG("Closing kernel fd");
close(kernel_tracer_fd);
- close(kernel_poll_pipe[0]);
- close(kernel_poll_pipe[1]);
static int update_kernel_pollfd(void)
{
int i = 0;
static int update_kernel_pollfd(void)
{
int i = 0;
- unsigned int nb_fd = 1;
+ /*
+ * The wakup pipe and the quit pipe are needed so the number of fds starts
+ * at 2 for those pipes.
+ */
+ unsigned int nb_fd = 2;
struct ltt_session *session;
struct ltt_kernel_channel *channel;
struct ltt_session *session;
struct ltt_kernel_channel *channel;
pthread_mutex_unlock(&session_list_ptr->lock);
/* Adding wake up pipe */
pthread_mutex_unlock(&session_list_ptr->lock);
/* Adding wake up pipe */
- kernel_pollfd[nb_fd - 1].fd = kernel_poll_pipe[0];
- kernel_pollfd[nb_fd - 1].events = POLLIN;
+ kernel_pollfd[nb_fd - 2].fd = kernel_poll_pipe[0];
+ kernel_pollfd[nb_fd - 2].events = POLLIN;
+
+ /* Adding the quit pipe */
+ kernel_pollfd[nb_fd - 1].fd = thread_quit_pipe[0];
+ /* Thread quit pipe has been closed. Killing thread. */
+ if (kernel_pollfd[nb_fd - 1].revents == POLLNVAL) {
+ goto error;
+ }
+
DBG("Kernel poll event triggered");
/*
* Check if the wake up pipe was triggered. If so, the kernel_pollfd
* must be updated.
*/
DBG("Kernel poll event triggered");
/*
* Check if the wake up pipe was triggered. If so, the kernel_pollfd
* must be updated.
*/
- switch (kernel_pollfd[nb_fd - 1].revents) {
+ switch (kernel_pollfd[nb_fd - 2].revents) {
case POLLIN:
ret = read(kernel_poll_pipe[0], &tmp, 1);
update_poll_flag = 1;
case POLLIN:
ret = read(kernel_poll_pipe[0], &tmp, 1);
update_poll_flag = 1;
if (kernel_pollfd) {
free(kernel_pollfd);
}
if (kernel_pollfd) {
free(kernel_pollfd);
}
+
+ close(kernel_poll_pipe[0]);
+ close(kernel_poll_pipe[1]);
*/
static void *thread_manage_kconsumerd(void *data)
{
*/
static void *thread_manage_kconsumerd(void *data)
{
enum lttcomm_return_code code;
enum lttcomm_return_code code;
+ struct pollfd pollfd[2];
DBG("[thread] Manage kconsumerd started");
DBG("[thread] Manage kconsumerd started");
+ /* First fd is always the quit pipe */
+ pollfd[0].fd = thread_quit_pipe[0];
+
+ /* Apps socket */
+ pollfd[1].fd = kconsumerd_err_sock;
+ pollfd[1].events = POLLIN;
+
+ /* Inifinite blocking call, waiting for transmission */
+ ret = poll(pollfd, 2, -1);
+ if (ret < 0) {
+ perror("poll kconsumerd thread");
+ goto error;
+ }
+
+ /* Thread quit pipe has been closed. Killing thread. */
+ if (pollfd[0].revents == POLLNVAL) {
+ goto error;
+ } else if (pollfd[1].revents == POLLERR) {
+ ERR("Kconsumerd err socket poll error");
+ goto error;
+ }
+
sock = lttcomm_accept_unix_sock(kconsumerd_err_sock);
if (sock < 0) {
goto error;
sock = lttcomm_accept_unix_sock(kconsumerd_err_sock);
if (sock < 0) {
goto error;
ERR("Kconsumerd return code : %s", lttcomm_get_readable_code(-code));
error:
ERR("Kconsumerd return code : %s", lttcomm_get_readable_code(-code));
error:
DBG("Kconsumerd thread dying");
DBG("Kconsumerd thread dying");
+ if (kconsumerd_err_sock) {
+ close(kconsumerd_err_sock);
+ }
+ if (kconsumerd_cmd_sock) {
+ close(kconsumerd_cmd_sock);
+ }
+ if (sock) {
+ close(sock);
+ }
+
+ unlink(kconsumerd_err_unix_sock_path);
+ unlink(kconsumerd_cmd_unix_sock_path);
+
+ kconsumerd_pid = 0;
*/
static void *thread_manage_apps(void *data)
{
*/
static void *thread_manage_apps(void *data)
{
+ int sock = 0, ret;
+ struct pollfd pollfd[2];
/* TODO: Something more elegant is needed but fine for now */
/* FIXME: change all types to either uint8_t, uint32_t, uint64_t
/* TODO: Something more elegant is needed but fine for now */
/* FIXME: change all types to either uint8_t, uint32_t, uint64_t
+ /* First fd is always the quit pipe */
+ pollfd[0].fd = thread_quit_pipe[0];
+
+ /* Apps socket */
+ pollfd[1].fd = apps_sock;
+ pollfd[1].events = POLLIN;
+
/* Notify all applications to register */
notify_apps(default_global_apps_pipe);
while (1) {
DBG("Accepting application registration");
/* Notify all applications to register */
notify_apps(default_global_apps_pipe);
while (1) {
DBG("Accepting application registration");
- /* Blocking call, waiting for transmission */
+
+ /* Inifinite blocking call, waiting for transmission */
+ ret = poll(pollfd, 2, -1);
+ if (ret < 0) {
+ perror("poll apps thread");
+ goto error;
+ }
+
+ /* Thread quit pipe has been closed. Killing thread. */
+ if (pollfd[0].revents == POLLNVAL) {
+ goto error;
+ } else if (pollfd[1].revents == POLLERR) {
+ ERR("Apps socket poll error");
+ goto error;
+ }
+
sock = lttcomm_accept_unix_sock(apps_sock);
if (sock < 0) {
goto error;
}
sock = lttcomm_accept_unix_sock(apps_sock);
if (sock < 0) {
goto error;
}
- /* Basic recv here to handle the very simple data
+ /*
+ * Basic recv here to handle the very simple data
* that the libust send to register (reg_msg).
*/
ret = recv(sock, ®_msg, sizeof(reg_msg), 0);
* that the libust send to register (reg_msg).
*/
ret = recv(sock, ®_msg, sizeof(reg_msg), 0);
+ DBG("Apps thread dying");
+ if (apps_sock) {
+ close(apps_sock);
+ }
+ if (sock) {
+ close(sock);
+ }
+ unlink(apps_unix_sock_path);
*/
static void *thread_manage_clients(void *data)
{
*/
static void *thread_manage_clients(void *data)
{
- int sock, ret;
- struct command_ctx *cmd_ctx;
+ int sock = 0, ret;
+ struct command_ctx *cmd_ctx = NULL;
+ struct pollfd pollfd[2];
DBG("[thread] Manage client started");
DBG("[thread] Manage client started");
+ /* First fd is always the quit pipe */
+ pollfd[0].fd = thread_quit_pipe[0];
+
+ /* Apps socket */
+ pollfd[1].fd = client_sock;
+ pollfd[1].events = POLLIN;
+
/* Notify parent pid that we are ready
* to accept command for client side.
*/
/* Notify parent pid that we are ready
* to accept command for client side.
*/
- /* Blocking call, waiting for transmission */
DBG("Accepting client command ...");
DBG("Accepting client command ...");
+
+ /* Inifinite blocking call, waiting for transmission */
+ ret = poll(pollfd, 2, -1);
+ if (ret < 0) {
+ perror("poll client thread");
+ goto error;
+ }
+
+ /* Thread quit pipe has been closed. Killing thread. */
+ if (pollfd[0].revents == POLLNVAL) {
+ goto error;
+ } else if (pollfd[1].revents == POLLERR) {
+ ERR("Client socket poll error");
+ goto error;
+ }
+
sock = lttcomm_accept_unix_sock(client_sock);
if (sock < 0) {
goto error;
sock = lttcomm_accept_unix_sock(client_sock);
if (sock < 0) {
goto error;
+ DBG("Client thread dying");
+ if (client_sock) {
+ close(client_sock);
+ }
+ if (sock) {
+ close(sock);
+ }
+
+ unlink(client_unix_sock_path);
+
+ clean_command_ctx(cmd_ctx);
void *status;
const char *home_path;
void *status;
const char *home_path;
+ /* Create thread quit pipe */
+ if (init_thread_quit_pipe() < 0) {
+ /* No goto error because nothing is initialized at this point */
+ exit(EXIT_FAILURE);
+ }
+
/* Parse arguments */
progname = argv[0];
if ((ret = parse_args(argc, argv) < 0)) {
/* Parse arguments */
progname = argv[0];
if ((ret = parse_args(argc, argv) < 0)) {
} else {
home_path = get_home_dir();
if (home_path == NULL) {
} else {
home_path = get_home_dir();
if (home_path == NULL) {
- ERR("Can't get HOME directory for sockets creation.\n \
- Please specify --socket PATH.");
+ /* TODO: Add --socket PATH option */
+ ERR("Can't get HOME directory for sockets creation.");
DBG("Client socket path %s", client_unix_sock_path);
DBG("Application socket path %s", apps_unix_sock_path);
DBG("Client socket path %s", client_unix_sock_path);
DBG("Application socket path %s", apps_unix_sock_path);
- /* See if daemon already exist. If any of the two
- * socket needed by the daemon are present, this test fails
+ /*
+ * See if daemon already exist. If any of the two socket needed by the
+ * daemon are present, this test fails. However, if the daemon is killed
+ * with a SIGKILL, those unix socket must be unlinked by hand.
*/
if ((ret = check_existing_daemon()) == 0) {
ERR("Already running daemon.\n");
*/
if ((ret = check_existing_daemon()) == 0) {
ERR("Already running daemon.\n");
- /* We do not goto error because we must not
+ /*
+ * We do not goto error because we must not
* cleanup() because a daemon is already running.
*/
exit(EXIT_FAILURE);
* cleanup() because a daemon is already running.
*/
exit(EXIT_FAILURE);
- /* Get session list pointer */
+ /*
+ * Get session list pointer. This pointer MUST NOT be free().
+ * This list is statically declared in session.c
+ */
session_list_ptr = get_session_list();
while (1) {
session_list_ptr = get_session_list();
while (1) {