#include "health-sessiond.h"
#include "rotate.h"
#include "cmd.h"
+#include "sessiond-timer.h"
#include <urcu.h>
#include <urcu/list.h>
struct lttng_pipe *ust32_channel_rotate_pipe,
struct lttng_pipe *ust64_channel_rotate_pipe,
struct lttng_pipe *kernel_channel_rotate_pipe,
- int thread_quit_pipe)
+ int thread_quit_pipe, int rotate_timer_pipe)
{
struct rotation_thread_handle *handle;
handle->kernel_consumer = -1;
}
handle->thread_quit_pipe = thread_quit_pipe;
+ handle->rotate_timer_pipe = rotate_timer_pipe;
end:
return handle;
int ret;
/*
- * Create pollset with size 4:
+ * Create pollset with size 5:
* - sessiond quit pipe
* - consumerd (32-bit user space) channel rotate pipe,
* - consumerd (64-bit user space) channel rotate pipe,
* - consumerd (kernel) channel rotate pipe.
+ * - sessiond rotate pending pipe
*/
- ret = lttng_poll_create(poll_set, 4, LTTNG_CLOEXEC);
+ ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
if (ret < 0) {
goto end;
}
ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
goto error;
}
+ ret = lttng_poll_add(poll_set, handle->rotate_timer_pipe,
+ LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
+ goto error;
+ }
ret = lttng_poll_add(poll_set, handle->ust32_consumer,
LPOLLIN | LPOLLERR);
if (ret < 0) {
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ret = lttng_poll_del(&state->events, fd);
if (ret) {
- ERR("[rotation-thread] Failed to remove consumer rotation pipe from poll set");
+ ERR("[rotation-thread] Failed to remove consumer "
+ "rotation pipe from poll set");
}
goto end;
}
time_t now = time(NULL);
if (now == (time_t) -1) {
+ channel_info->session->rotate_status = LTTNG_ROTATE_ERROR;
ret = LTTNG_ERR_ROTATE_NOT_AVAILABLE;
goto end;
}
ERR("Failed to rename completed rotation chunk");
goto end;
}
+ channel_info->session->rotate_pending = false;
+ if (channel_info->session->rotate_pending_relay) {
+ ret = sessiond_timer_rotate_pending_start(
+ channel_info->session,
+ DEFAULT_ROTATE_PENDING_RELAY_TIMER);
+ if (ret) {
+ ERR("Enabling rotate pending timer");
+ ret = -1;
+ goto end;
+ }
+ }
}
channel_rotation_info_destroy(channel_info);
return ret;
}
+static
+int rotate_pending_relay_timer(struct ltt_session *session)
+{
+ int ret;
+
+ DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
+ session->id);
+ ret = relay_rotate_pending(session, session->rotate_count - 1);
+ if (ret < 0) {
+ ERR("[rotation-thread] Check relay rotate pending");
+ goto end;
+ }
+ if (ret == 0) {
+ DBG("[rotation-thread] Rotation completed on the relay for "
+ "session %" PRIu64, session->id);
+ /*
+ * Stop the timer and clear the queue, the timers are currently
+ * ignored because of the rotate_pending_relay_check_in_progress
+ * flag.
+ */
+ sessiond_timer_rotate_pending_stop(session);
+ /*
+ * Now we can clear the pending flag in the session. New
+ * rotations can start now.
+ */
+ session->rotate_pending_relay = false;
+ } else if (ret == 1) {
+ DBG("[rotation-thread] Rotation still pending on the relay for "
+ "session %" PRIu64, session->id);
+ }
+ /*
+ * Allow the timer thread to send other notifications when needed.
+ */
+ session->rotate_pending_relay_check_in_progress = false;
+ fprintf(stderr, "RET PENDING: %d\n", ret);
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+static
+int rotate_timer(struct ltt_session *session)
+{
+ int ret;
+
+ DBG("[rotation-thread] Rotate timer on session %" PRIu64, session->id);
+
+ /*
+ * If the session is stopped, we need to cancel this timer.
+ */
+ session_lock(session);
+ if (!session->active && session->rotate_timer_enabled) {
+ sessiond_rotate_timer_stop(session);
+ }
+
+ ret = cmd_rotate_session(session, NULL);
+ session_unlock(session);
+ fprintf(stderr, "RET ROTATE TIMER: %d\n", ret);
+ if (ret == -LTTNG_ERR_ROTATE_PENDING) {
+ ret = 0;
+ goto end;
+ } else if (ret != LTTNG_OK) {
+ ret = -1;
+ goto end;
+ }
+
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+static
+int handle_rotate_timer_pipe(int fd, uint32_t revents,
+ struct rotation_thread_handle *handle,
+ struct rotation_thread_state *state)
+{
+ int ret = 0;
+ struct ltt_session *session;
+ struct sessiond_rotation_timer timer_data;
+
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ret = lttng_poll_del(&state->events, fd);
+ if (ret) {
+ ERR("[rotation-thread] Failed to remove consumer "
+ "rotate pending pipe from poll set");
+ }
+ goto end;
+ }
+
+ memset(&timer_data, 0, sizeof(struct sessiond_rotation_timer));
+
+ do {
+ ret = read(fd, &timer_data, sizeof(timer_data));
+ } while (ret == -1 && errno == EINTR);
+ if (ret != sizeof(timer_data)) {
+ ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
+ fd);
+ ret = -1;
+ goto end;
+ }
+
+ rcu_read_lock();
+ session_lock_list();
+ session = session_find_by_id(timer_data.session_id);
+ if (!session) {
+ ERR("[rotation-thread] Session %" PRIu64 " not found",
+ timer_data.session_id);
+ ret = -1;
+ goto end_unlock;
+ }
+
+ if (timer_data.signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
+ ret = rotate_pending_relay_timer(session);
+ } else if (timer_data.signal == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
+ ret = rotate_timer(session);
+ } else {
+ ERR("Unknown signal in rotate timer");
+ ret = -1;
+ }
+
+end_unlock:
+ session_unlock_list();
+ rcu_read_unlock();
+end:
+ return ret;
+}
+
void *thread_rotation(void *data)
{
int ret;
if (fd == handle->thread_quit_pipe) {
DBG("[rotation-thread] Quit pipe activity");
goto exit;
+ } else if (fd == handle->rotate_timer_pipe) {
+ ret = handle_rotate_timer_pipe(fd, revents,
+ handle, &state);
+ if (ret) {
+ ERR("[rotation-thread] Rotate timer");
+ goto error;
+ }
} else if (fd == handle->ust32_consumer ||
fd == handle->ust64_consumer ||
fd == handle->kernel_consumer) {