#include "session.h"
#include "timer.h"
#include "notification-thread-commands.h"
+#include "utils.h"
+#include "thread.h"
#include <urcu.h>
#include <urcu/list.h>
struct rotation_thread_timer_queue *rotation_timer_queue;
/* Access to the notification thread cmd_queue */
struct notification_thread_handle *notification_thread_handle;
+ /* Thread-specific quit pipe. */
+ struct lttng_pipe *quit_pipe;
};
static
void rotation_thread_timer_queue_destroy(
struct rotation_thread_timer_queue *queue)
{
- struct rotation_thread_job *job, *tmp_job;
-
if (!queue) {
return;
}
lttng_pipe_destroy(queue->event_pipe);
pthread_mutex_lock(&queue->lock);
- /* Empty wait queue. */
- cds_list_for_each_entry_safe(job, tmp_job, &queue->list, head) {
- log_job_destruction(job);
- cds_list_del(&job->head);
- free(job);
- }
+ assert(cds_list_empty(&queue->list));
pthread_mutex_unlock(&queue->lock);
pthread_mutex_destroy(&queue->lock);
free(queue);
void rotation_thread_handle_destroy(
struct rotation_thread_handle *handle)
{
+ lttng_pipe_destroy(handle->quit_pipe);
free(handle);
}
handle->rotation_timer_queue = rotation_timer_queue;
handle->notification_thread_handle = notification_thread_handle;
+ handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
+ if (!handle->quit_pipe) {
+ goto error;
+ }
end:
return handle;
+error:
+ rotation_thread_handle_destroy(handle);
+ return NULL;
}
/*
int ret;
/*
- * Create pollset with size 2:
- * - quit pipe,
+ * Create pollset with size 3:
+ * - rotation thread quit pipe,
* - rotation thread timer queue pipe,
+ * - notification channel sock,
*/
- ret = sessiond_set_thread_pollset(poll_set, 2);
- if (ret) {
+ ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
+ if (ret < 0) {
goto error;
}
+
+ ret = lttng_poll_add(poll_set,
+ lttng_pipe_get_readfd(handle->quit_pipe),
+ LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ ERR("[rotation-thread] Failed to add quit pipe read fd to poll set");
+ goto error;
+ }
+
ret = lttng_poll_add(poll_set,
lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe),
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
+ ERR("[rotation-thread] Failed to add rotate_pending fd to poll set");
goto error;
}
int ret;
struct rotation_thread_handle *handle = data;
struct rotation_thread thread;
+ const int queue_pipe_fd = lttng_pipe_get_readfd(
+ handle->rotation_timer_queue->event_pipe);
DBG("[rotation-thread] Started rotation thread");
goto error;
}
- /* Ready to handle client connections. */
- sessiond_notify_ready();
-
while (true) {
int fd_count, i;
}
} else {
/* Job queue or quit pipe activity. */
- if (fd == lttng_pipe_get_readfd(
- handle->rotation_timer_queue->event_pipe)) {
- char buf;
-
- ret = lttng_read(fd, &buf, 1);
- if (ret != 1) {
- ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
- ret = -1;
- goto error;
- }
- }
/*
* The job queue is serviced if there is
goto error;
}
- if (sessiond_check_thread_quit_pipe(fd, revents)) {
+ if (fd == queue_pipe_fd) {
+ char buf;
+
+ ret = lttng_read(fd, &buf, 1);
+ if (ret != 1) {
+ ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+ ret = -1;
+ goto error;
+ }
+ } else {
DBG("[rotation-thread] Quit pipe activity");
goto exit;
}
end:
return NULL;
}
+
+static
+bool shutdown_rotation_thread(void *thread_data)
+{
+ struct rotation_thread_handle *handle = thread_data;
+ const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
+
+ return notify_thread_pipe(write_fd) == 1;
+}
+
+bool launch_rotation_thread(struct rotation_thread_handle *handle)
+{
+ struct lttng_thread *thread;
+
+ thread = lttng_thread_create("Rotation",
+ thread_rotation,
+ shutdown_rotation_thread,
+ NULL,
+ handle);
+ if (!thread) {
+ goto error;
+ }
+ lttng_thread_put(thread);
+ return true;
+error:
+ return false;
+}