struct lttng_pipe *ust32_channel_rotate_pipe,
struct lttng_pipe *ust64_channel_rotate_pipe,
struct lttng_pipe *kernel_channel_rotate_pipe,
- int thread_quit_pipe)
+ int thread_quit_pipe, int rotate_timer_pipe)
{
struct rotation_thread_handle *handle;
handle->kernel_consumer = -1;
}
handle->thread_quit_pipe = thread_quit_pipe;
+ handle->rotate_timer_pipe = rotate_timer_pipe;
end:
return handle;
int ret;
/*
- * Create pollset with size 4:
+ * Create pollset with size 5:
* - sessiond quit pipe
* - consumerd (32-bit user space) channel rotate pipe,
* - consumerd (64-bit user space) channel rotate pipe,
* - consumerd (kernel) channel rotate pipe.
+ * - sessiond rotate pending pipe
*/
- ret = lttng_poll_create(poll_set, 4, LTTNG_CLOEXEC);
+ ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
if (ret < 0) {
goto end;
}
ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
goto error;
}
+ ret = lttng_poll_add(poll_set, handle->rotate_timer_pipe,
+ LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
+ goto error;
+ }
ret = lttng_poll_add(poll_set, handle->ust32_consumer,
LPOLLIN | LPOLLERR);
if (ret < 0) {
return ret;
}
+static
+int handle_rotate_timer_pipe(int fd, uint32_t revents,
+ struct rotation_thread_handle *handle,
+ struct rotation_thread_state *state)
+{
+ int ret = 0;
+ uint64_t session_id;
+ struct ltt_session *session;
+
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ret = lttng_poll_del(&state->events, fd);
+ if (ret) {
+ ERR("[rotation-thread] Failed to remove consumer "
+ "rotate pending pipe from poll set");
+ }
+ goto end;
+ }
+
+ do {
+ ret = read(fd, &session_id, sizeof(session_id));
+ } while (ret == -1 && errno == EINTR);
+ if (ret != sizeof(session_id)) {
+ ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
+ fd);
+ ret = -1;
+ goto end;
+ }
+
+ session = session_find_by_id(session_id);
+ if (!session) {
+ ERR("[rotation-thread] Session %" PRIu64 " not found",
+ session_id);
+ ret = -1;
+ goto end;
+ }
+
+ DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
+ session_id);
+ ret = relay_rotate_pending(session, session->rotate_count - 1);
+ if (ret < 0) {
+ ERR("[rotation-thread] Check relay rotate pending");
+ goto end;
+ }
+ if (ret == 0) {
+ DBG("[rotation-thread] Rotation completed on the relay for "
+ "session %" PRIu64, session_id);
+ /* TODO: delete timer */
+ } else if (ret == 1) {
+ DBG("[rotation-thread] Rotation still pending on the relay for "
+ "session %" PRIu64, session_id);
+ }
+ fprintf(stderr, "RET PENDING: %d\n", ret);
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
void *thread_rotation(void *data)
{
int ret;
if (fd == handle->thread_quit_pipe) {
DBG("[rotation-thread] Quit pipe activity");
goto exit;
+ } else if (fd == handle->rotate_timer_pipe) {
+ ret = handle_rotate_timer_pipe(fd, revents,
+ handle, &state);
+ if (ret) {
+ ERR("[rotation-thread] Rotate pending");
+ goto error;
+ }
} else if (fd == handle->ust32_consumer ||
fd == handle->ust64_consumer ||
fd == handle->kernel_consumer) {