Fix: rotation may never complete in per-PID buffering mode
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 21 Sep 2018 22:16:05 +0000 (18:16 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 2 Oct 2018 17:56:25 +0000 (13:56 -0400)
Issue
-----

The current scheme to ensure that a rotation is completed
consists in the following, from the session daemon's perspective:

Iterate on all channels:
  - Ask the consumerd to sample the current "write" positions
  - Increment a count of channels being rotated

Wait for the consumer daemon to notify the session daemon every time
a channel's streams's "read" position have all reached the sampled
"write" position.

The idea behind this is making sure that all the data that was
produced before a rotation was triggered has been consumed (i.e.
been written to a local FS or streamed to the relay daemon) before
marking the rotation as completed.

However, this assumes that the session daemon is always aware of
all channels/streams that exist at the moment at which the rotation is
initiated. This is only true for the kernel domain.

In per-PID buffer mode, it is possible for an application, and its
buffers, to be torn down at any moment. Thus the following scenario
can happen:

- The application fills its buffers, causing the consumerd to fall
  behind
- The application exits, leaving its full buffers behind to be
  extracted by the consumer daemon
- The session daemon removes anything to do with the application from
  its internal structures, including its channels
- A rotation is initiated
- The positions of the application's buffers are never sampled as the
  session daemon does not see the channels when iterating on the
  session's channels

Multiple bad things can happen from there.

First, the rotation can be marked as "completed" while the consumerd
is still exctracting the dead application's buffers, causing readers
to consume an incomplete/corrupted trace.

Second, if the session is being streamed to a relay daemon, it is
possible for the 'rename' command to be issued before the contents
of the buffers has been written causing indexes to fail to be
flushed (as the relay daemon attempts to write them to a now-defunct
location).

Solution
--------

Eliminate the pipe between the session daemon and consumer daemon that
is used to signify that a rotation is completed as the information is
unreliable.

The rotation thread now periodically asks the consumer daemon to check
for channels that have a pending rotation for a given session_id or
that belong to the ongoing rotation archive id.

Hence, for every stream:
  - If the archive id during which it was created is '>' than that of
    the ongoing rotation, we don't need to consider it
  - If the current position is '>=' than the sampled rotation position,
    we can consider its rotation 'done'
  - If it belongs to the pending rotation archive id and doesn't have
    a "target" position, it was unknown to the session daemon and the
    application associated with it is dead. We must wait for the
    stream to be flushed and torn down before assuming that the
    rotation was completed.

Drawbacks
---------

This polling approach is somewhat inefficient and can cause rotations
to take longer to complete than necessary, especially in high-latency
networking conditions.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
23 files changed:
configure.ac
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel.c
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/rotate.c
src/bin/lttng-sessiond/rotate.h
src/bin/lttng-sessiond/rotation-thread.c
src/bin/lttng-sessiond/rotation-thread.h
src/bin/lttng-sessiond/session.c
src/bin/lttng-sessiond/session.h
src/bin/lttng-sessiond/sessiond-timer.c
src/bin/lttng-sessiond/sessiond-timer.h
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/defaults.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c
tests/unit/test_ust_data.c

index c78ba3edb59dd285b7588f3bda37ca19ebda2fe3..69a9d776acfce3772829aae9e2ab7890d18b5a46 100644 (file)
@@ -382,7 +382,7 @@ _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_CONTROL_BIND_ADDRESS], [0.0.0.0])
 _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_DATA_BIND_ADDRESS], [0.0.0.0])
 _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_VIEWER_BIND_ADDRESS], [localhost])
 _AC_DEFINE_AND_SUBST([DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE], [134217728])
-_AC_DEFINE_AND_SUBST([DEFAULT_ROTATE_PENDING_RELAY_TIMER], [200000])
+_AC_DEFINE_AND_SUBST([DEFAULT_ROTATE_PENDING_TIMER], [500000])
 
 # Command short descriptions
 _AC_DEFINE_QUOTED_AND_SUBST([CMD_DESCR_ADD_CONTEXT], [Add context fields to a channel])
index e88ad0d0de4b3439f88d417ad58e82f58b9c99f3..0d860236e201551a8389969359e85b1caf133eaa 100644 (file)
@@ -2701,7 +2701,7 @@ int cmd_start_trace(struct ltt_session *session)
        session->rotated_after_last_stop = false;
 
        if (session->rotate_timer_period) {
-               ret = sessiond_rotate_timer_start(session,
+               ret = timer_session_rotation_schedule_timer_start(session,
                                session->rotate_timer_period);
                if (ret < 0) {
                        ERR("Failed to enable rotate timer");
@@ -2735,9 +2735,9 @@ int rename_active_chunk(struct ltt_session *session)
                goto end;
        }
 
-       ret = rename_complete_chunk(session, time(NULL));
+       ret = rename_completed_chunk(session, time(NULL));
        if (ret < 0) {
-               ERR("Failed to rename current rotate path");
+               ERR("Failed to rename current rotation's path");
                goto end;
        }
 
@@ -2782,15 +2782,23 @@ int cmd_stop_trace(struct ltt_session *session)
                goto error;
        }
 
-       if (session->rotate_relay_pending_timer_enabled) {
-               sessiond_timer_rotate_pending_stop(session);
+       if (session->rotation_pending_check_timer_enabled) {
+               if (timer_session_rotation_pending_check_stop(session)) {
+                       ERR("Failed to stop the \"rotation pending check\" timer of session %s",
+                                       session->name);
+               }
        }
 
-       if (session->rotate_timer_enabled) {
-               sessiond_rotate_timer_stop(session);
+       if (session->rotation_schedule_timer_enabled) {
+               if (timer_session_rotation_schedule_timer_stop(
+                               session)) {
+                       ERR("Failed to stop the \"rotation schedule\" timer of session %s",
+                                       session->name);
+               }
        }
 
-       if (session->current_archive_id > 0 && !session->rotate_pending) {
+       if (session->current_archive_id > 0 &&
+                       session->rotation_state != LTTNG_ROTATION_STATE_ONGOING) {
                ret = rename_active_chunk(session);
                if (ret) {
                        /*
@@ -3089,12 +3097,19 @@ int cmd_destroy_session(struct ltt_session *session, int wpipe,
 
        DBG("Begin destroy session %s (id %" PRIu64 ")", session->name, session->id);
 
-       if (session->rotate_relay_pending_timer_enabled) {
-               sessiond_timer_rotate_pending_stop(session);
+       if (session->rotation_pending_check_timer_enabled) {
+               if (timer_session_rotation_pending_check_stop(session)) {
+                       ERR("Failed to stop the \"rotation pending check\" timer of session %s",
+                                       session->name);
+               }
        }
 
-       if (session->rotate_timer_enabled) {
-               sessiond_rotate_timer_stop(session);
+       if (session->rotation_schedule_timer_enabled) {
+               if (timer_session_rotation_schedule_timer_stop(
+                               session)) {
+                       ERR("Failed to stop the \"rotation schedule\" timer of session %s",
+                                       session->name);
+               }
        }
 
        if (session->rotate_size) {
@@ -3564,10 +3579,8 @@ int cmd_data_pending(struct ltt_session *session)
                }
        }
 
-       /*
-        * A rotation is still pending, we have to wait.
-        */
-       if (session->rotate_pending) {
+       /* A rotation is still pending, we have to wait. */
+       if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
                DBG("Rotate still pending for session %s", session->name);
                ret = 1;
                goto error;
@@ -4561,7 +4574,6 @@ int cmd_rotate_session(struct ltt_session *session,
        struct tm *timeinfo;
        char datetime[21];
        time_t now;
-       bool ust_active = false;
 
        assert(session);
 
@@ -4586,9 +4598,10 @@ int cmd_rotate_session(struct ltt_session *session,
                goto end;
        }
 
-       if (session->rotate_pending || session->rotate_pending_relay) {
+       if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
                ret = -LTTNG_ERR_ROTATION_PENDING;
-               DBG("Rotate already in progress");
+               DBG("Refusing to launch a rotation; a rotation is already in progress for session %s",
+                               session->name);
                goto end;
        }
 
@@ -4607,15 +4620,11 @@ int cmd_rotate_session(struct ltt_session *session,
        if (session->current_archive_id == 0) {
                const char *base_path = NULL;
 
+               assert(session->kernel_session || session->ust_session);
                /* Either one of the two sessions is enough to get the root path. */
-               if (session->kernel_session) {
-                       base_path = session_get_base_path(session);
-               } else if (session->ust_session) {
-                       base_path = session_get_base_path(session);
-               } else {
-                       assert(0);
-               }
+               base_path = session_get_base_path(session);
                assert(base_path);
+
                ret = lttng_strncpy(session->rotation_chunk.current_rotate_path,
                                base_path,
                                sizeof(session->rotation_chunk.current_rotate_path));
@@ -4640,21 +4649,29 @@ int cmd_rotate_session(struct ltt_session *session,
        }
        DBG("Current rotate path %s", session->rotation_chunk.current_rotate_path);
 
+       /*
+        * Channels created after this point will belong to the next
+        * archive id.
+        */
        session->current_archive_id++;
-       session->rotate_pending = true;
+       /*
+        * A rotation has a local step even if the destination is a relay
+        * daemon; the buffers must be consumed by the consumer daemon.
+        */
+       session->rotation_pending_local = true;
+       session->rotation_pending_relay =
+               session_get_consumer_destination_type(session) == CONSUMER_DST_NET;
        session->rotation_state = LTTNG_ROTATION_STATE_ONGOING;
        ret = notification_thread_command_session_rotation_ongoing(
                        notification_thread_handle,
                        session->name, session->uid, session->gid,
-                       session->current_archive_id);
+                       session->current_archive_id - 1);
        if (ret != LTTNG_OK) {
                ERR("Failed to notify notification thread that a session rotation is ongoing for session %s",
                                session->name);
        }
 
-       /*
-        * Create the path name for the next chunk.
-        */
+       /* Create the path name for the next chunk. */
        now = time(NULL);
        if (now == (time_t) -1) {
                ret = -LTTNG_ERR_ROTATION_NOT_AVAILABLE;
@@ -4752,41 +4769,16 @@ int cmd_rotate_session(struct ltt_session *session,
                        ret = -LTTNG_ERR_CREATE_DIR_FAIL;
                        goto end;
                }
-               ret = ust_app_rotate_session(session, &ust_active);
+               ret = ust_app_rotate_session(session);
                if (ret != LTTNG_OK) {
                        goto end;
                }
-               /*
-                * Handle the case where we did not start a rotation on any channel.
-                * The consumer will never wake up the rotation thread to perform the
-                * rename, so we have to do it here while we hold the session and
-                * session_list locks.
-                */
-               if (!session->kernel_session && !ust_active) {
-                       struct lttng_trace_archive_location *location;
-
-                       session->rotate_pending = false;
-                       session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
-                       ret = rename_complete_chunk(session, now);
-                       if (ret < 0) {
-                               ERR("Failed to rename completed rotation chunk");
-                               goto end;
-                       }
+       }
 
-                       /* Ownership of location is transferred. */
-                       location = session_get_trace_archive_location(session);
-                       ret = notification_thread_command_session_rotation_completed(
-                                       notification_thread_handle,
-                                       session->name,
-                                       session->uid,
-                                       session->gid,
-                                       session->current_archive_id,
-                                       location);
-                       if (ret != LTTNG_OK) {
-                               ERR("Failed to notify notification thread that rotation is complete for session %s",
-                                               session->name);
-                       }
-               }
+       ret = timer_session_rotation_pending_check_start(session,
+                       DEFAULT_ROTATE_PENDING_TIMER);
+       if (ret) {
+               goto end;
        }
 
        if (!session->active) {
@@ -4797,8 +4789,8 @@ int cmd_rotate_session(struct ltt_session *session,
                rotate_return->rotation_id = session->current_archive_id;
        }
 
-       DBG("Cmd rotate session %s, current_archive_id %" PRIu64 " sent",
-                       session->name, session->current_archive_id);
+       DBG("Cmd rotate session %s, archive_id %" PRIu64 " sent",
+                       session->name, session->current_archive_id - 1);
        ret = LTTNG_OK;
 
 end:
@@ -4999,14 +4991,16 @@ int cmd_rotation_set_schedule(struct ltt_session *session,
                         * Only start the timer if the session is active,
                         * otherwise it will be started when the session starts.
                         */
-                       ret = sessiond_rotate_timer_start(session, new_value);
+                       ret = timer_session_rotation_schedule_timer_start(
+                                       session, new_value);
                        if (ret) {
                                ERR("Failed to enable session rotation timer in ROTATION_SET_SCHEDULE command");
                                ret = LTTNG_ERR_UNK;
                                goto end;
                        }
                } else {
-                       ret = sessiond_rotate_timer_stop(session);
+                       ret = timer_session_rotation_schedule_timer_stop(
+                                       session);
                        if (ret) {
                                ERR("Failed to disable session rotation timer in ROTATION_SET_SCHEDULE command");
                                ret = LTTNG_ERR_UNK;
index bc973ddf80e78e90d535036ac1de3427266ac8d3..06fa5eeb4b4e54112a95c5998b1b870b46575a37 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
+ *               2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -1081,10 +1082,6 @@ int consumer_send_pipe(struct consumer_socket *consumer_sock,
                pipe_name = "channel monitor";
                command_name = "SET_CHANNEL_MONITOR_PIPE";
                break;
-       case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
-               pipe_name = "channel rotate";
-               command_name = "SET_CHANNEL_ROTATE_PIPE";
-               break;
        default:
                ERR("Unexpected command received in %s (cmd = %d)", __func__,
                                (int) cmd);
@@ -1124,13 +1121,6 @@ int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
                        LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe);
 }
 
-int consumer_send_channel_rotate_pipe(struct consumer_socket *consumer_sock,
-               int pipe)
-{
-       return consumer_send_pipe(consumer_sock,
-                       LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE, pipe);
-}
-
 /*
  * Set consumer subdirectory using the session name and a generated datetime if
  * needed. This is appended to the current subdirectory.
@@ -1649,8 +1639,7 @@ end:
 int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
                uid_t uid, gid_t gid, struct consumer_output *output,
                char *domain_path, bool is_metadata_channel,
-               uint64_t new_chunk_id,
-               bool *rotate_pending_relay)
+               uint64_t new_chunk_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -1677,7 +1666,6 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
                        ret = -1;
                        goto error;
                }
-               *rotate_pending_relay = true;
        } else {
                msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
                ret = snprintf(msg.u.rotate_channel.pathname,
@@ -1760,14 +1748,56 @@ error:
 }
 
 /*
- * Ask the relay if a rotation is still pending. Must be called with the socket
- * lock held.
+ * Ask the consumer if a rotation is locally pending. Must be called with the
+ * socket lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_check_rotation_pending_local(struct consumer_socket *socket,
+               uint64_t session_id, uint64_t chunk_id)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+       uint32_t pending = 0;
+
+       assert(socket);
+
+       DBG("Asking consumer to locally check for pending rotation for session %" PRIu64 ", chunk id %" PRIu64,
+                       session_id, chunk_id);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL;
+       msg.u.check_rotation_pending_local.session_id = session_id;
+       msg.u.check_rotation_pending_local.chunk_id = chunk_id;
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+       ret = consumer_socket_recv(socket, &pending, sizeof(pending));
+       if (ret < 0) {
+               goto error;
+       }
+
+       ret = pending;
+
+error:
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Ask the consumer if a rotation is pending on the relayd. Must be called with
+ * the socket lock held.
  *
  * Return 1 if the rotation is still pending, 0 if finished, a negative value
  * on error.
  */
-int consumer_rotate_pending_relay(struct consumer_socket *socket,
-               struct consumer_output *output, uint64_t session_id,
+int consumer_check_rotation_pending_relay(struct consumer_socket *socket,
+               const struct consumer_output *output, uint64_t session_id,
                uint64_t chunk_id)
 {
        int ret;
@@ -1776,15 +1806,15 @@ int consumer_rotate_pending_relay(struct consumer_socket *socket,
 
        assert(socket);
 
-       DBG("Consumer rotate pending on relay for session %" PRIu64 ", chunk id %" PRIu64,
+       DBG("Asking consumer to check for pending rotation on relay for session %" PRIu64 ", chunk id %" PRIu64,
                        session_id, chunk_id);
        assert(output->type == CONSUMER_DST_NET);
 
        memset(&msg, 0, sizeof(msg));
-       msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY;
-       msg.u.rotate_pending_relay.session_id = session_id;
-       msg.u.rotate_pending_relay.relayd_id = output->net_seq_index;
-       msg.u.rotate_pending_relay.chunk_id = chunk_id;
+       msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY;
+       msg.u.check_rotation_pending_relay.session_id = session_id;
+       msg.u.check_rotation_pending_relay.relayd_id = output->net_seq_index;
+       msg.u.check_rotation_pending_relay.chunk_id = chunk_id;
 
        health_code_update();
        ret = consumer_send_msg(socket, &msg);
index cdc48b7f3131df6600af05bf3c31cd70a5046266..e9c7e31e31990eafde155956fb6a0f726cf95506 100644 (file)
@@ -98,11 +98,6 @@ struct consumer_data {
         * consumer.
         */
        int channel_monitor_pipe;
-       /*
-        * Write-end of the channel rotation pipe to be passed to the
-        * consumer.
-        */
-       int channel_rotate_pipe;
        /*
         * The metadata socket object is handled differently and only created
         * locally in this object thus it's the only reference available in the
@@ -237,8 +232,6 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                char *session_name, char *hostname, int session_live_timer);
 int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
                int pipe);
-int consumer_send_channel_rotate_pipe(struct consumer_socket *consumer_sock,
-               int pipe);
 int consumer_send_destroy_relayd(struct consumer_socket *sock,
                struct consumer_output *consumer);
 int consumer_recv_status_reply(struct consumer_socket *sock);
@@ -324,15 +317,17 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                uid_t uid, gid_t gid, const char *session_path, int wait,
                uint64_t nb_packets_per_stream, uint64_t trace_archive_id);
 
+/* Rotation commands. */
 int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
                uid_t uid, gid_t gid, struct consumer_output *output,
-               char *domain_path, bool is_metadata_channel, uint64_t new_chunk_id,
-               bool *rotate_pending_relay);
+               char *domain_path, bool is_metadata_channel, uint64_t new_chunk_id);
 int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
                const struct consumer_output *output, const char *old_path,
                const char *new_path, uid_t uid, gid_t gid);
-int consumer_rotate_pending_relay(struct consumer_socket *socket,
-               struct consumer_output *output, uint64_t session_id,
+int consumer_check_rotation_pending_local(struct consumer_socket *socket,
+               uint64_t session_id, uint64_t chunk_id);
+int consumer_check_rotation_pending_relay(struct consumer_socket *socket,
+               const struct consumer_output *output, uint64_t session_id,
                uint64_t chunk_id);
 int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id,
                const struct consumer_output *output, const char *path,
index e7f1d54a02bde7ecb72ec2e16e122982aeae319c..be263943f0211a9cffdb563f0a22ddfda9e9653c 100644 (file)
@@ -1426,35 +1426,15 @@ int kernel_rotate_session(struct ltt_session *session)
                        socket, node.node) {
                struct ltt_kernel_channel *chan;
 
-               /*
-                * Account the metadata channel first to make sure the
-                * number of channels waiting for a rotation cannot
-                * reach 0 before we complete the iteration over all
-                * the channels.
-                */
-               ret = rotate_add_channel_pending(ksess->metadata->key,
-                               LTTNG_DOMAIN_KERNEL, session);
-               if (ret < 0) {
-                       ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
-                       goto error;
-               }
-
                /* For each channel, ask the consumer to rotate it. */
                cds_list_for_each_entry(chan, &ksess->channel_list.head, list) {
-                       ret = rotate_add_channel_pending(chan->key,
-                                       LTTNG_DOMAIN_KERNEL, session);
-                       if (ret < 0) {
-                               ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
-                               goto error;
-                       }
-
-                       DBG("Rotate channel %" PRIu64 ", session %s", chan->key, session->name);
+                       DBG("Rotate kernel channel %" PRIu64 ", session %s",
+                                       chan->key, session->name);
                        ret = consumer_rotate_channel(socket, chan->key,
                                        ksess->uid, ksess->gid, ksess->consumer,
                                        ksess->consumer->subdir,
                                        /* is_metadata_channel */ false,
-                                       session->current_archive_id,
-                                       &session->rotate_pending_relay);
+                                       session->current_archive_id);
                        if (ret < 0) {
                                ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
                                goto error;
@@ -1468,8 +1448,7 @@ int kernel_rotate_session(struct ltt_session *session)
                                ksess->uid, ksess->gid, ksess->consumer,
                                ksess->consumer->subdir,
                                /* is_metadata_channel */ true,
-                               session->current_archive_id,
-                               &session->rotate_pending_relay);
+                               session->current_archive_id);
                if (ret < 0) {
                        ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
                        goto error;
index d8ea2c1a5b2640af24864c6d12d6c91c8dd78001..bd111029c510c2407a27b48d3d23922b741f9332 100644 (file)
@@ -111,7 +111,6 @@ static struct consumer_data kconsumer_data = {
        .err_sock = -1,
        .cmd_sock = -1,
        .channel_monitor_pipe = -1,
-       .channel_rotate_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -122,7 +121,6 @@ static struct consumer_data ustconsumer64_data = {
        .err_sock = -1,
        .cmd_sock = -1,
        .channel_monitor_pipe = -1,
-       .channel_rotate_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -133,7 +131,6 @@ static struct consumer_data ustconsumer32_data = {
        .err_sock = -1,
        .cmd_sock = -1,
        .channel_monitor_pipe = -1,
-       .channel_rotate_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -531,24 +528,6 @@ static void close_consumer_sockets(void)
                        PERROR("UST consumerd64 channel monitor pipe close");
                }
        }
-       if (kconsumer_data.channel_rotate_pipe >= 0) {
-               ret = close(kconsumer_data.channel_rotate_pipe);
-               if (ret < 0) {
-                       PERROR("kernel consumer channel rotate pipe close");
-               }
-       }
-       if (ustconsumer32_data.channel_rotate_pipe >= 0) {
-               ret = close(ustconsumer32_data.channel_rotate_pipe);
-               if (ret < 0) {
-                       PERROR("UST consumerd32 channel rotate pipe close");
-               }
-       }
-       if (ustconsumer64_data.channel_rotate_pipe >= 0) {
-               ret = close(ustconsumer64_data.channel_rotate_pipe);
-               if (ret < 0) {
-                       PERROR("UST consumerd64 channel rotate pipe close");
-               }
-       }
 }
 
 /*
@@ -1325,8 +1304,7 @@ restart:
 
        /*
         * Transfer the write-end of the channel monitoring and rotate pipe
-        * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE and
-        * SET_CHANNEL_ROTATE_PIPE commands.
+        * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
         */
        cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
        if (!cmd_socket_wrapper) {
@@ -1340,12 +1318,6 @@ restart:
                goto error;
        }
 
-       ret = consumer_send_channel_rotate_pipe(cmd_socket_wrapper,
-                       consumer_data->channel_rotate_pipe);
-       if (ret) {
-               goto error;
-       }
-
        /* Discard the socket wrapper as it is no longer needed. */
        consumer_destroy_socket(cmd_socket_wrapper);
        cmd_socket_wrapper = NULL;
@@ -3366,7 +3338,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
                        }
 
                        /*
-                        * Setup socket for consumer 64 bit. No need for atomic access
+                        * Setup socket for consumer 32 bit. No need for atomic access
                         * since it was set above and can ONLY be set in this thread.
                         */
                        ret = consumer_create_socket(&ustconsumer32_data,
@@ -5842,48 +5814,6 @@ end:
        return ret;
 }
 
-static
-struct rotation_thread_timer_queue *create_rotate_timer_queue(void)
-{
-       struct rotation_thread_timer_queue *queue = NULL;
-
-       queue = zmalloc(sizeof(struct rotation_thread_timer_queue));
-       if (!queue) {
-               PERROR("Failed to allocate timer rotate queue");
-               goto end;
-       }
-
-       queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
-       CDS_INIT_LIST_HEAD(&queue->list);
-       pthread_mutex_init(&queue->lock, NULL);
-
-end:
-       return queue;
-}
-
-static
-void destroy_rotate_timer_queue(struct rotation_thread_timer_queue *queue)
-{
-       struct sessiond_rotation_timer *node, *tmp_node;
-
-       if (!queue) {
-               return;
-       }
-
-       lttng_pipe_destroy(queue->event_pipe);
-
-       pthread_mutex_lock(&queue->lock);
-       /* Empty wait queue. */
-       cds_list_for_each_entry_safe(node, tmp_node, &queue->list, head) {
-               cds_list_del(&node->head);
-               free(node);
-       }
-       pthread_mutex_unlock(&queue->lock);
-
-       pthread_mutex_destroy(&queue->lock);
-       free(queue);
-}
-
 /*
  * main
  */
@@ -5898,9 +5828,6 @@ int main(int argc, char **argv)
        bool notification_thread_launched = false;
        bool rotation_thread_launched = false;
        bool timer_thread_launched = false;
-       struct lttng_pipe *ust32_channel_rotate_pipe = NULL,
-                       *ust64_channel_rotate_pipe = NULL,
-                       *kernel_channel_rotate_pipe = NULL;
        struct timer_thread_parameters timer_thread_ctx;
        /* Queue of rotation jobs populated by the sessiond-timer. */
        struct rotation_thread_timer_queue *rotation_timer_queue = NULL;
@@ -5915,7 +5842,7 @@ int main(int argc, char **argv)
                goto exit_set_signal_handler;
        }
 
-       if (sessiond_timer_signal_init()) {
+       if (timer_signal_init()) {
                retval = -1;
                goto exit_set_signal_handler;
        }
@@ -6069,19 +5996,6 @@ int main(int argc, char **argv)
                        retval = -1;
                        goto exit_init_data;
                }
-               kernel_channel_rotate_pipe = lttng_pipe_open(0);
-               if (!kernel_channel_rotate_pipe) {
-                       ERR("Failed to create kernel consumer channel rotate pipe");
-                       retval = -1;
-                       goto exit_init_data;
-               }
-               kconsumer_data.channel_rotate_pipe =
-                               lttng_pipe_release_writefd(
-                                       kernel_channel_rotate_pipe);
-               if (kconsumer_data.channel_rotate_pipe < 0) {
-                       retval = -1;
-                       goto exit_init_data;
-               }
        }
 
        /* Set consumer initial state */
@@ -6100,30 +6014,18 @@ int main(int argc, char **argv)
                retval = -1;
                goto exit_init_data;
        }
-       ust32_channel_rotate_pipe = lttng_pipe_open(0);
-       if (!ust32_channel_rotate_pipe) {
-               ERR("Failed to create 32-bit user space consumer channel rotate pipe");
-               retval = -1;
-               goto exit_init_data;
-       }
-       ustconsumer32_data.channel_rotate_pipe = lttng_pipe_release_writefd(
-                       ust32_channel_rotate_pipe);
-       if (ustconsumer32_data.channel_rotate_pipe < 0) {
-               retval = -1;
-               goto exit_init_data;
-       }
 
        /*
-        * The rotation_timer_queue structure is shared between the sessiond timer
-        * thread and the rotation thread. The main() keeps the ownership and
-        * destroys it when both threads have quit.
+        * The rotation_thread_timer_queue structure is shared between the
+        * sessiond timer thread and the rotation thread. The main thread keeps
+        * its ownership and destroys it when both threads have been joined.
         */
-       rotation_timer_queue = create_rotate_timer_queue();
+       rotation_timer_queue = rotation_thread_timer_queue_create();
        if (!rotation_timer_queue) {
                retval = -1;
                goto exit_init_data;
        }
-       timer_thread_ctx.rotation_timer_queue = rotation_timer_queue;
+       timer_thread_ctx.rotation_thread_job_queue = rotation_timer_queue;
 
        ust64_channel_monitor_pipe = lttng_pipe_open(0);
        if (!ust64_channel_monitor_pipe) {
@@ -6137,18 +6039,6 @@ int main(int argc, char **argv)
                retval = -1;
                goto exit_init_data;
        }
-       ust64_channel_rotate_pipe = lttng_pipe_open(0);
-       if (!ust64_channel_rotate_pipe) {
-               ERR("Failed to create 64-bit user space consumer channel rotate pipe");
-               retval = -1;
-               goto exit_init_data;
-       }
-       ustconsumer64_data.channel_rotate_pipe = lttng_pipe_release_writefd(
-                       ust64_channel_rotate_pipe);
-       if (ustconsumer64_data.channel_rotate_pipe < 0) {
-               retval = -1;
-               goto exit_init_data;
-       }
 
        /*
         * Init UST app hash table. Alloc hash table before this point since
@@ -6333,7 +6223,7 @@ int main(int argc, char **argv)
 
        /* Create timer thread. */
        ret = pthread_create(&timer_thread, default_pthread_attr(),
-                       sessiond_timer_thread, &timer_thread_ctx);
+                       timer_thread_func, &timer_thread_ctx);
        if (ret) {
                errno = ret;
                PERROR("pthread_create timer");
@@ -6345,9 +6235,6 @@ int main(int argc, char **argv)
 
        /* rotation_thread_data acquires the pipes' read side. */
        rotation_thread_handle = rotation_thread_handle_create(
-                       ust32_channel_rotate_pipe,
-                       ust64_channel_rotate_pipe,
-                       kernel_channel_rotate_pipe,
                        thread_quit_pipe[0],
                        rotation_timer_queue,
                        notification_thread_handle,
@@ -6601,7 +6488,7 @@ exit_init_data:
        }
 
        if (timer_thread_launched) {
-               kill(getpid(), LTTNG_SESSIOND_SIG_EXIT);
+               timer_exit();
                ret = pthread_join(timer_thread, &status);
                if (ret) {
                        errno = ret;
@@ -6614,7 +6501,7 @@ exit_init_data:
         * After the rotation and timer thread have quit, we can safely destroy
         * the rotation_timer_queue.
         */
-       destroy_rotate_timer_queue(rotation_timer_queue);
+       rotation_thread_timer_queue_destroy(rotation_timer_queue);
 
        rcu_thread_offline();
        rcu_unregister_thread();
@@ -6626,9 +6513,6 @@ exit_init_data:
        lttng_pipe_destroy(ust32_channel_monitor_pipe);
        lttng_pipe_destroy(ust64_channel_monitor_pipe);
        lttng_pipe_destroy(kernel_channel_monitor_pipe);
-       lttng_pipe_destroy(ust32_channel_rotate_pipe);
-       lttng_pipe_destroy(ust64_channel_rotate_pipe);
-       lttng_pipe_destroy(kernel_channel_rotate_pipe);
 exit_ht_cleanup:
 
        health_app_destroy(health_sessiond);
index dac0f2a1fda8145cfa8f7fdbca724a2890257f87..17d3c51fdbf0823adb32d4bd3aab8606058df2f7 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
 #include <urcu/list.h>
 #include <urcu/rculfhash.h>
 
-unsigned long hash_channel_key(struct rotation_channel_key *key)
-{
-       return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong(
-                       (void *) (unsigned long) key->domain, lttng_ht_seed);
-}
-
-int rotate_add_channel_pending(uint64_t key, enum lttng_domain_type domain,
-               struct ltt_session *session)
-{
-       int ret;
-       struct rotation_channel_info *new_info;
-       struct rotation_channel_key channel_key = { .key = key,
-               .domain = domain };
-
-       new_info = zmalloc(sizeof(struct rotation_channel_info));
-       if (!new_info) {
-               goto error;
-       }
-
-       new_info->channel_key.key = key;
-       new_info->channel_key.domain = domain;
-       new_info->session_id = session->id;
-       cds_lfht_node_init(&new_info->rotate_channels_ht_node);
-
-       session->nr_chan_rotate_pending++;
-       cds_lfht_add(channel_pending_rotate_ht,
-                       hash_channel_key(&channel_key),
-                       &new_info->rotate_channels_ht_node);
-
-       ret = 0;
-       goto end;
-
-error:
-       ret = -1;
-end:
-       return ret;
-}
-
 /* The session's lock must be held by the caller. */
 static
 int session_rename_chunk(struct ltt_session *session, char *current_path,
@@ -208,7 +171,7 @@ error:
  *
  * Returns 0 on success, a negative value on error.
  */
-int rename_complete_chunk(struct ltt_session *session, time_t ts)
+int rename_completed_chunk(struct ltt_session *session, time_t ts)
 {
        struct tm *timeinfo;
        char new_path[LTTNG_PATH_MAX];
@@ -357,53 +320,6 @@ end:
        return ret;
 }
 
-int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id)
-{
-       int ret;
-       struct consumer_socket *socket;
-       struct consumer_output *output;
-       struct lttng_ht_iter iter;
-
-       /*
-        * Either one of the sessions is enough to find the consumer_output
-        * and uid/gid.
-        */
-       if (session->kernel_session) {
-               output = session->kernel_session->consumer;
-       } else if (session->ust_session) {
-               output = session->ust_session->consumer;
-       } else {
-               assert(0);
-       }
-
-       if (!output || !output->socks) {
-               ERR("No consumer output found");
-               ret = -1;
-               goto end;
-       }
-
-       ret = -1;
-
-       rcu_read_lock();
-       /*
-        * We have to iterate to find a socket, but we only need to send the
-        * rotate pending command to one consumer, so we break after the first
-        * one.
-        */
-       cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket,
-                       node.node) {
-               pthread_mutex_lock(socket->lock);
-               ret = consumer_rotate_pending_relay(socket, output, session->id,
-                               chunk_id);
-               pthread_mutex_unlock(socket->lock);
-               break;
-       }
-       rcu_read_unlock();
-
-end:
-       return ret;
-}
-
 int subscribe_session_consumed_size_rotation(struct ltt_session *session, uint64_t size,
                struct notification_thread_handle *notification_thread_handle)
 {
index 6dc3b7aa189768f9c47787f12e03ae031d0585ee..c7986c815dc671eaab660c342e40c05225cd3ff6 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
 #ifndef ROTATE_H
 #define ROTATE_H
 
-#include <lttng/notification/channel-internal.h>
 #include "rotation-thread.h"
+#include <stdint.h>
 
-/*
- * Key in channel_pending_rotate_ht to map a channel to a
- * struct rotation_channel_info.
- */
-struct rotation_channel_key {
-       uint64_t key;
-       enum lttng_domain_type domain;
-};
-
-/*
- * Added in channel_pending_rotate_ht everytime we start the rotation of a
- * channel. The consumer notifies the rotation thread with the channel_key to
- * inform a rotation is complete, we use that information to lookup the related
- * session from channel_pending_rotate_ht.
- */
-struct rotation_channel_info {
-       uint64_t session_id;
-       struct rotation_channel_key channel_key;
-       struct cds_lfht_node rotate_channels_ht_node;
-};
-
-
-extern struct cds_lfht *channel_pending_rotate_ht;
-extern struct lttng_notification_channel *rotate_notification_channel;
-
-unsigned long hash_channel_key(struct rotation_channel_key *key);
-
-/* session lock must be held by this function's caller. */
-int rename_complete_chunk(struct ltt_session *session, time_t ts);
-
-int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id);
-
-/*
- * When we start the rotation of a channel, we add its information in
- * channel_pending_rotate_ht. This is called in the context of
- * thread_manage_client when the client asks for a rotation, in the context
- * of the sessiond_timer thread when periodic rotations are enabled and from
- * the rotation_thread when size-based rotations are enabled.
- */
-int rotate_add_channel_pending(uint64_t key, enum lttng_domain_type domain,
-               struct ltt_session *session);
+int rotate_add_pending_rotation(struct ltt_session *session, uint64_t chunk_id);
+int rename_completed_chunk(struct ltt_session *session, time_t ts);
 
 /*
  * Subscribe/unsubscribe the notification_channel from the rotation_thread to
index 01a963c25d45c026119a2300d78780ac82772736..5b0267952676ccc549a95c7a3b21bab3a70d413b 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
 
 #include <urcu.h>
 #include <urcu/list.h>
-#include <urcu/rculfhash.h>
-
-/*
- * Store a struct rotation_channel_info for each channel that is currently
- * being rotated by the consumer.
- */
-struct cds_lfht *channel_pending_rotate_ht;
 
 struct lttng_notification_channel *rotate_notification_channel = NULL;
 
-struct rotation_thread_state {
+struct rotation_thread {
        struct lttng_poll_event events;
 };
 
+struct rotation_thread_job {
+       enum rotation_thread_job_type type;
+       uint64_t session_id;
+       /* List member in struct rotation_thread_timer_queue. */
+       struct cds_list_head head;
+};
+
+/*
+ * The timer thread enqueues jobs and wakes up the rotation thread.
+ * When the rotation thread wakes up, it empties the queue.
+ */
+struct rotation_thread_timer_queue {
+       struct lttng_pipe *event_pipe;
+       struct cds_list_head list;
+       pthread_mutex_t lock;
+};
+
+struct rotation_thread_handle {
+       int quit_pipe;
+       struct rotation_thread_timer_queue *rotation_timer_queue;
+       /* Access to the notification thread cmd_queue */
+       struct notification_thread_handle *notification_thread_handle;
+       sem_t *notification_thread_ready;
+};
+
 static
-void channel_rotation_info_destroy(struct rotation_channel_info *channel_info)
+const char *get_job_type_str(enum rotation_thread_job_type job_type)
 {
-       assert(channel_info);
-       free(channel_info);
+       switch (job_type) {
+       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+               return "CHECK_PENDING_ROTATION";
+       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+               return "SCHEDULED_ROTATION";
+       default:
+               abort();
+       }
 }
 
-static
-int match_channel_info(struct cds_lfht_node *node, const void *key)
+struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void)
 {
-       struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key;
-       struct rotation_channel_info *channel_info;
+       struct rotation_thread_timer_queue *queue = NULL;
 
-       channel_info = caa_container_of(node, struct rotation_channel_info,
-                       rotate_channels_ht_node);
+       queue = zmalloc(sizeof(*queue));
+       if (!queue) {
+               PERROR("Failed to allocate timer rotate queue");
+               goto end;
+       }
 
-       return !!((channel_key->key == channel_info->channel_key.key) &&
-                       (channel_key->domain == channel_info->channel_key.domain));
+       queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
+       CDS_INIT_LIST_HEAD(&queue->list);
+       pthread_mutex_init(&queue->lock, NULL);
+end:
+       return queue;
 }
 
-static
-struct rotation_channel_info *lookup_channel_pending(uint64_t key,
-               enum lttng_domain_type domain)
+void log_job_destruction(const struct rotation_thread_job *job)
 {
-       struct cds_lfht_iter iter;
-       struct cds_lfht_node *node;
-       struct rotation_channel_info *channel_info = NULL;
-       struct rotation_channel_key channel_key = { .key = key,
-                       .domain = domain };
-
-       cds_lfht_lookup(channel_pending_rotate_ht,
-                       hash_channel_key(&channel_key),
-                       match_channel_info,
-                       &channel_key, &iter);
-       node = cds_lfht_iter_get_node(&iter);
-       if (!node) {
-               goto end;
+       enum lttng_error_level log_level;
+       const char *job_type_str = get_job_type_str(job->type);
+
+       switch (job->type) {
+       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+               /*
+                * Not a problem, the scheduled rotation is racing with the teardown
+                * of the daemon. In this case, the rotation will not happen, which
+                * is not a problem (or at least, not important enough to delay
+                * the shutdown of the session daemon).
+                */
+               log_level = PRINT_DBG;
+               break;
+       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+               /* This is not expected to happen; warn the user. */
+               log_level = PRINT_WARN;
+               break;
+       default:
+               abort();
        }
 
-       channel_info = caa_container_of(node, struct rotation_channel_info,
-                       rotate_channels_ht_node);
-       cds_lfht_del(channel_pending_rotate_ht, node);
-end:
-       return channel_info;
+       LOG(log_level, "Rotation thread timer queue still contains job of type %s targeting session %" PRIu64 " on destruction",
+                       job_type_str, job->session_id);
 }
 
-/*
- * Destroy the thread data previously created by the init function.
- */
-void rotation_thread_handle_destroy(
-               struct rotation_thread_handle *handle)
+void rotation_thread_timer_queue_destroy(
+               struct rotation_thread_timer_queue *queue)
 {
-       int ret;
+       struct rotation_thread_job *job, *tmp_job;
 
-       if (!handle) {
-               goto end;
+       if (!queue) {
+               return;
        }
 
-       if (handle->ust32_consumer >= 0) {
-               ret = close(handle->ust32_consumer);
-               if (ret) {
-                       PERROR("close 32-bit consumer channel rotation pipe");
-               }
-       }
-       if (handle->ust64_consumer >= 0) {
-               ret = close(handle->ust64_consumer);
-               if (ret) {
-                       PERROR("close 64-bit consumer channel rotation pipe");
-               }
-       }
-       if (handle->kernel_consumer >= 0) {
-               ret = close(handle->kernel_consumer);
-               if (ret) {
-                       PERROR("close kernel consumer channel rotation pipe");
-               }
+       lttng_pipe_destroy(queue->event_pipe);
+
+       pthread_mutex_lock(&queue->lock);
+       /* Empty wait queue. */
+       cds_list_for_each_entry_safe(job, tmp_job, &queue->list, head) {
+               log_job_destruction(job);
+               cds_list_del(&job->head);
+               free(job);
        }
+       pthread_mutex_unlock(&queue->lock);
+       pthread_mutex_destroy(&queue->lock);
+       free(queue);
+}
 
-end:
+/*
+ * Destroy the thread data previously created by the init function.
+ */
+void rotation_thread_handle_destroy(
+               struct rotation_thread_handle *handle)
+{
        free(handle);
 }
 
 struct rotation_thread_handle *rotation_thread_handle_create(
-               struct lttng_pipe *ust32_channel_rotate_pipe,
-               struct lttng_pipe *ust64_channel_rotate_pipe,
-               struct lttng_pipe *kernel_channel_rotate_pipe,
-               int thread_quit_pipe,
+               int quit_pipe,
                struct rotation_thread_timer_queue *rotation_timer_queue,
                struct notification_thread_handle *notification_thread_handle,
                sem_t *notification_thread_ready)
@@ -157,46 +181,88 @@ struct rotation_thread_handle *rotation_thread_handle_create(
                goto end;
        }
 
-       if (ust32_channel_rotate_pipe) {
-               handle->ust32_consumer =
-                               lttng_pipe_release_readfd(
-                                       ust32_channel_rotate_pipe);
-               if (handle->ust32_consumer < 0) {
-                       goto error;
+       handle->quit_pipe = quit_pipe;
+       handle->rotation_timer_queue = rotation_timer_queue;
+       handle->notification_thread_handle = notification_thread_handle;
+       handle->notification_thread_ready = notification_thread_ready;
+
+end:
+       return handle;
+}
+
+/*
+ * Called with the rotation_thread_timer_queue lock held.
+ * Return true if the same timer job already exists in the queue, false if not.
+ */
+static
+bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
+               enum rotation_thread_job_type job_type, uint64_t session_id)
+{
+       bool exists = false;
+       struct rotation_thread_job *job;
+
+       cds_list_for_each_entry(job, &queue->list, head) {
+               if (job->session_id == session_id && job->type == job_type) {
+                       exists = true;
+                       goto end;
                }
-       } else {
-               handle->ust32_consumer = -1;
        }
-       if (ust64_channel_rotate_pipe) {
-               handle->ust64_consumer =
-                               lttng_pipe_release_readfd(
-                                       ust64_channel_rotate_pipe);
-               if (handle->ust64_consumer < 0) {
-                       goto error;
-               }
-       } else {
-               handle->ust64_consumer = -1;
+end:
+       return exists;
+}
+
+void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
+               enum rotation_thread_job_type job_type, uint64_t session_id)
+{
+       int ret;
+       const char * const dummy = "!";
+       struct rotation_thread_job *job = NULL;
+       const char *job_type_str = get_job_type_str(job_type);
+
+       pthread_mutex_lock(&queue->lock);
+       if (timer_job_exists(queue, session_id, job_type)) {
+               /*
+                * This timer job is already pending, we don't need to add
+                * it.
+                */
+               goto end;
        }
-       if (kernel_channel_rotate_pipe) {
-               handle->kernel_consumer =
-                               lttng_pipe_release_readfd(
-                                       kernel_channel_rotate_pipe);
-               if (handle->kernel_consumer < 0) {
-                       goto error;
+
+       job = zmalloc(sizeof(struct rotation_thread_job));
+       if (!job) {
+               PERROR("Failed to allocate rotation thread job of type \"%s\" for session id %" PRIu64,
+                               job_type_str, session_id);
+               goto end;
+       }
+       job->type = job_type;
+       job->session_id = session_id;
+       cds_list_add_tail(&job->head, &queue->list);
+
+       ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), dummy,
+                       1);
+       if (ret < 0) {
+               /*
+                * We do not want to block in the timer handler, the job has
+                * been enqueued in the list, the wakeup pipe is probably full,
+                * the job will be processed when the rotation_thread catches
+                * up.
+                */
+               if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                       /*
+                        * Not an error, but would be surprising and indicate
+                        * that the rotation thread can't keep up with the
+                        * current load.
+                        */
+                       DBG("Wake-up pipe of rotation thread job queue is full");
+                       goto end;
                }
-       } else {
-               handle->kernel_consumer = -1;
+               PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session id %" PRIu64,
+                               job_type_str, session_id);
+               goto end;
        }
-       handle->thread_quit_pipe = thread_quit_pipe;
-       handle->rotation_timer_queue = rotation_timer_queue;
-       handle->notification_thread_handle = notification_thread_handle;
-       handle->notification_thread_ready = notification_thread_ready;
 
 end:
-       return handle;
-error:
-       rotation_thread_handle_destroy(handle);
-       return NULL;
+       pthread_mutex_unlock(&queue->lock);
 }
 
 static
@@ -206,22 +272,19 @@ int init_poll_set(struct lttng_poll_event *poll_set,
        int ret;
 
        /*
-        * Create pollset with size 5:
-        *      - sessiond quit pipe
-        *      - sessiond timer pipe,
-        *      - consumerd (32-bit user space) channel rotate pipe,
-        *      - consumerd (64-bit user space) channel rotate pipe,
-        *      - consumerd (kernel) channel rotate pipe,
+        * Create pollset with size 2:
+        *      - quit pipe,
+        *      - rotation thread timer queue pipe,
         */
-       ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
+       ret = lttng_poll_create(poll_set, 2, LTTNG_CLOEXEC);
        if (ret < 0) {
                goto end;
        }
 
-       ret = lttng_poll_add(poll_set, handle->thread_quit_pipe,
+       ret = lttng_poll_add(poll_set, handle->quit_pipe,
                        LPOLLIN | LPOLLERR);
        if (ret < 0) {
-               ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
+               ERR("[rotation-thread] Failed to add quit_pipe fd to pollset");
                goto error;
        }
        ret = lttng_poll_add(poll_set,
@@ -231,26 +294,6 @@ int init_poll_set(struct lttng_poll_event *poll_set,
                ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
                goto error;
        }
-       ret = lttng_poll_add(poll_set, handle->ust32_consumer,
-                       LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
-               goto error;
-       }
-       ret = lttng_poll_add(poll_set, handle->ust64_consumer,
-                       LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
-               goto error;
-       }
-       if (handle->kernel_consumer >= 0) {
-               ret = lttng_poll_add(poll_set, handle->kernel_consumer,
-                               LPOLLIN | LPOLLERR);
-               if (ret < 0) {
-                       ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
-                       goto error;
-               }
-       }
 
 end:
        return ret;
@@ -260,13 +303,9 @@ error:
 }
 
 static
-void fini_thread_state(struct rotation_thread_state *state)
+void fini_thread_state(struct rotation_thread *state)
 {
-       int ret;
-
        lttng_poll_clean(&state->events);
-       ret = cds_lfht_destroy(channel_pending_rotate_ht, NULL);
-       assert(!ret);
        if (rotate_notification_channel) {
                lttng_notification_channel_destroy(rotate_notification_channel);
        }
@@ -274,7 +313,7 @@ void fini_thread_state(struct rotation_thread_state *state)
 
 static
 int init_thread_state(struct rotation_thread_handle *handle,
-               struct rotation_thread_state *state)
+               struct rotation_thread *state)
 {
        int ret;
 
@@ -287,14 +326,6 @@ int init_thread_state(struct rotation_thread_handle *handle,
                goto end;
        }
 
-       channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE,
-                       1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
-       if (!channel_pending_rotate_ht) {
-               ERR("[rotation-thread] Failed to create channel pending rotation hash table");
-               ret = -1;
-               goto end;
-       }
-
        /*
         * We wait until the notification thread is ready to create the
         * notification channel and add it to the poll_set.
@@ -319,252 +350,324 @@ end:
 }
 
 static
-int handle_channel_rotation_pipe(int fd, uint32_t revents,
-               struct rotation_thread_handle *handle,
-               struct rotation_thread_state *state)
+int check_session_rotation_pending_local_on_consumer(
+               const struct ltt_session *session,
+               struct consumer_socket *socket, bool *rotation_completed)
 {
-       int ret = 0;
-       enum lttng_domain_type domain;
-       struct rotation_channel_info *channel_info;
-       struct ltt_session *session = NULL;
-       uint64_t key;
-
-       if (fd == handle->ust32_consumer ||
-                       fd == handle->ust64_consumer) {
-               domain = LTTNG_DOMAIN_UST;
-       } else if (fd == handle->kernel_consumer) {
-               domain = LTTNG_DOMAIN_KERNEL;
+       int ret;
+
+       pthread_mutex_lock(socket->lock);
+       DBG("[rotation-thread] Checking for locally pending rotation on the %s consumer for session %s",
+                       lttng_consumer_type_str(socket->type),
+                       session->name);
+       ret = consumer_check_rotation_pending_local(socket,
+                       session->id,
+                       session->current_archive_id - 1);
+       pthread_mutex_unlock(socket->lock);
+
+       if (ret == 0) {
+               /* Rotation was completed on this consumer. */
+               DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" was completed on the %s consumer",
+                               session->current_archive_id - 1,
+                               session->name,
+                               lttng_consumer_type_str(socket->type));
+               *rotation_completed = true;
+       } else if (ret == 1) {
+               /* Rotation pending on this consumer. */
+               DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
+                               session->current_archive_id - 1,
+                               session->name,
+                               lttng_consumer_type_str(socket->type));
+               *rotation_completed = false;
+               ret = 0;
        } else {
-               ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
-                               fd);
-               abort();
+               /* Not a fatal error. */
+               ERR("[rotation-thread] Encountered an error when checking if local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
+                               session->current_archive_id - 1,
+                               session->name,
+                               lttng_consumer_type_str(socket->type));
+               *rotation_completed = false;
        }
+       return ret;
+}
 
-       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-               ret = lttng_poll_del(&state->events, fd);
-               if (ret) {
-                       ERR("[rotation-thread] Failed to remove consumer "
-                                       "rotation pipe from poll set");
+static
+int check_session_rotation_pending_local(struct ltt_session *session)
+{
+       int ret;
+       struct consumer_socket *socket;
+       struct cds_lfht_iter iter;
+       bool rotation_completed = true;
+
+       /*
+        * Check for a local pending rotation on all consumers (32-bit
+        * user space, 64-bit user space, and kernel).
+        */
+       DBG("[rotation-thread] Checking for pending local rotation on session \"%s\", trace archive %" PRIu64,
+                       session->name, session->current_archive_id - 1);
+
+       rcu_read_lock();
+       if (!session->ust_session) {
+               goto skip_ust;
+       }
+       cds_lfht_for_each_entry(session->ust_session->consumer->socks->ht,
+                       &iter, socket, node.node) {
+               ret = check_session_rotation_pending_local_on_consumer(session,
+                               socket, &rotation_completed);
+               if (ret || !rotation_completed) {
+                       goto end;
                }
-               goto end;
        }
 
-       do {
-               ret = read(fd, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret != sizeof(key)) {
-               ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
-                               fd);
-               ret = -1;
-               goto end;
+skip_ust:
+       if (!session->kernel_session) {
+               goto skip_kernel;
        }
+       cds_lfht_for_each_entry(session->kernel_session->consumer->socks->ht,
+                               &iter, socket, node.node) {
+               ret = check_session_rotation_pending_local_on_consumer(session,
+                               socket, &rotation_completed);
+               if (ret || !rotation_completed) {
+                       goto end;
+               }
+       }
+skip_kernel:
+end:
+       rcu_read_unlock();
 
-       DBG("[rotation-thread] Received notification for chan %" PRIu64
-                       ", domain %d", key, domain);
-
-       channel_info = lookup_channel_pending(key, domain);
-       if (!channel_info) {
-               ERR("[rotation-thread] Failed to find channel_info (key = %"
-                               PRIu64 ")", key);
-               ret = -1;
-               goto end;
+       if (rotation_completed) {
+               DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
+                               session->current_archive_id - 1,
+                               session->name);
+               session->rotation_pending_local = false;
        }
-       rcu_read_lock();
-       session_lock_list();
-       session = session_find_by_id(channel_info->session_id);
-       if (!session) {
-               /*
-                * The session may have been destroyed before we had a chance to
-                * perform this action, return gracefully.
-                */
-               DBG("[rotation-thread] Session %" PRIu64 " not found",
-                               channel_info->session_id);
-               ret = 0;
-               goto end_unlock_session_list;
+       if (ret) {
+               session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
        }
+       return 0;
+}
 
-       session_lock(session);
-       if (--session->nr_chan_rotate_pending == 0) {
-               time_t now = time(NULL);
-
-               if (now == (time_t) -1) {
-                       session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
-                       ret = LTTNG_ERR_UNK;
-                       goto end_unlock_session;
-               }
+static
+int check_session_rotation_pending_relay(struct ltt_session *session)
+{
+       int ret;
+       struct consumer_socket *socket;
+       struct cds_lfht_iter iter;
+       bool rotation_completed = true;
+       const struct consumer_output *output;
 
-               ret = rename_complete_chunk(session, now);
-               if (ret < 0) {
-                       ERR("Failed to rename completed rotation chunk");
-                       goto end_unlock_session;
-               }
-               session->rotate_pending = false;
-               session->last_chunk_start_ts = session->current_chunk_start_ts;
-               if (session->rotate_pending_relay) {
-                       ret = sessiond_timer_rotate_pending_start(
-                                       session,
-                                       DEFAULT_ROTATE_PENDING_RELAY_TIMER);
-                       if (ret) {
-                               ERR("Failed to enable rotate pending timer");
-                               ret = -1;
-                               goto end_unlock_session;
-                       }
-               } else {
-                       struct lttng_trace_archive_location *location;
-
-                       session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
-                       /* Ownership of location is transferred. */
-                       location = session_get_trace_archive_location(session);
-                       ret = notification_thread_command_session_rotation_completed(
-                                       notification_thread_handle,
-                                       session->name,
-                                       session->uid,
-                                       session->gid,
-                                       session->current_archive_id,
-                                       location);
-                       if (ret != LTTNG_OK) {
-                               ERR("Failed to notify notification thread that rotation is complete for session %s",
-                                               session->name);
-                       }
+       /*
+        * Check for a pending rotation on any consumer as we only use
+        * it as a "tunnel" to the relayd.
+        */
 
-               }
-               DBG("Rotation completed for session %s", session->name);
+       rcu_read_lock();
+       if (session->ust_session) {
+               cds_lfht_first(session->ust_session->consumer->socks->ht,
+                               &iter);
+               output = session->ust_session->consumer;
+       } else {
+               cds_lfht_first(session->kernel_session->consumer->socks->ht,
+                               &iter);
+               output = session->kernel_session->consumer;
        }
+       assert(cds_lfht_iter_get_node(&iter));
 
-       ret = 0;
+       socket = caa_container_of(cds_lfht_iter_get_node(&iter),
+                       typeof(*socket), node.node);
+
+       pthread_mutex_lock(socket->lock);
+       DBG("[rotation-thread] Checking for pending relay rotation on session \"%s\", trace archive %" PRIu64 " through the %s consumer",
+                       session->name, session->current_archive_id - 1,
+                       lttng_consumer_type_str(socket->type));
+       ret = consumer_check_rotation_pending_relay(socket,
+                       output,
+                       session->id,
+                       session->current_archive_id - 1);
+       pthread_mutex_unlock(socket->lock);
+
+       if (ret == 0) {
+               /* Rotation was completed on the relay. */
+               DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" was completed",
+                               session->current_archive_id - 1,
+                               session->name);
+       } else if (ret == 1) {
+               /* Rotation pending on relay. */
+               DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" is pending",
+                               session->current_archive_id - 1,
+                               session->name);
+               rotation_completed = false;
+       } else {
+               /* Not a fatal error. */
+               ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay",
+                               session->current_archive_id - 1,
+                               session->name);
+               session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+               rotation_completed = false;
+       }
 
-end_unlock_session:
-       channel_rotation_info_destroy(channel_info);
-       session_unlock(session);
-end_unlock_session_list:
-       session_unlock_list();
        rcu_read_unlock();
-end:
-       return ret;
+
+       if (rotation_completed) {
+               DBG("[rotation-thread] Totation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
+                               session->current_archive_id - 1,
+                               session->name);
+               session->rotation_pending_relay = false;
+       }
+       return 0;
 }
 
 /*
- * Process the rotate_pending check, called with session lock held.
+ * Check if the last rotation was completed, called with session lock held.
  */
 static
-int rotate_pending_relay_timer(struct ltt_session *session)
+int check_session_rotation_pending(struct ltt_session *session,
+               struct notification_thread_handle *notification_thread_handle)
 {
        int ret;
+       struct lttng_trace_archive_location *location;
+       time_t now;
 
-       DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
-                       session->id);
-       ret = relay_rotate_pending(session, session->current_archive_id - 1);
-       if (ret < 0) {
-               ERR("[rotation-thread] Check relay rotate pending");
-               goto end;
-       }
-       if (ret == 0) {
-               struct lttng_trace_archive_location *location;
+       DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
+                       session->name, session->current_archive_id - 1);
+
+       if (session->rotation_pending_local) {
+               /* Updates session->rotation_pending_local as needed. */
+               ret = check_session_rotation_pending_local(session);
+               if (ret) {
+                       goto end;
+               }
 
-               DBG("[rotation-thread] Rotation completed on the relay for "
-                               "session %" PRIu64, session->id);
                /*
-                * Now we can clear the pending flag in the session. New
-                * rotations can start now.
+                * No need to check for a pending rotation on the relay
+                * since the rotation is not even completed locally yet.
                 */
-               session->rotate_pending_relay = false;
-               session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
-
-               session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
-               /* Ownership of location is transferred. */
-               location = session_get_trace_archive_location(session);
-               ret = notification_thread_command_session_rotation_completed(
-                               notification_thread_handle,
-                               session->name,
-                               session->uid,
-                               session->gid,
-                               session->current_archive_id,
-                               location);
-               if (ret != LTTNG_OK) {
-                       ERR("Failed to notify notification thread that rotation is complete for session %s",
-                                       session->name);
+               if (session->rotation_pending_local) {
+                       goto end;
                }
-       } else if (ret == 1) {
-               DBG("[rotation-thread] Rotation still pending on the relay for "
-                               "session %" PRIu64, session->id);
-               ret = sessiond_timer_rotate_pending_start(session,
-                               DEFAULT_ROTATE_PENDING_RELAY_TIMER);
+       }
+
+       if (session->rotation_pending_relay) {
+               /* Updates session->rotation_pending_relay as needed. */
+               ret = check_session_rotation_pending_relay(session);
                if (ret) {
-                       ERR("Re-enabling rotate pending timer");
-                       ret = -1;
+                       goto end;
+               }
+
+               if (session->rotation_pending_relay) {
                        goto end;
                }
        }
 
-       ret = 0;
+       DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " completed for "
+                       "session %s", session->current_archive_id - 1,
+                       session->name);
 
+       /* Rename the completed trace archive's location. */
+       now = time(NULL);
+       if (now == (time_t) -1) {
+               session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+               ret = LTTNG_ERR_UNK;
+               goto end;
+       }
+
+       ret = rename_completed_chunk(session, now);
+       if (ret < 0) {
+               ERR("Failed to rename completed rotation chunk");
+               goto end;
+       }
+       session->last_chunk_start_ts = session->current_chunk_start_ts;
+
+       /*
+        * Now we can clear the "ONGOING" state in the session. New
+        * rotations can start now.
+        */
+       session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
+
+       /* Ownership of location is transferred. */
+       location = session_get_trace_archive_location(session);
+       ret = notification_thread_command_session_rotation_completed(
+                       notification_thread_handle,
+                       session->name,
+                       session->uid,
+                       session->gid,
+                       session->current_archive_id,
+                       location);
+       if (ret != LTTNG_OK) {
+               ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+                               session->name);
+       }
+
+       ret = 0;
 end:
+       if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+               DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " is still pending for session %s",
+                               session->current_archive_id - 1, session->name);
+               ret = timer_session_rotation_pending_check_start(session,
+                               DEFAULT_ROTATE_PENDING_TIMER);
+               if (ret) {
+                       ERR("Re-enabling rotate pending timer");
+                       ret = -1;
+                       goto end;
+               }
+       }
+
        return ret;
 }
 
-/*
- * Process the rotate_timer, called with session lock held.
- */
+/* Call with the session lock held. */
 static
-int rotate_timer(struct ltt_session *session)
+int launch_session_rotation(struct ltt_session *session)
 {
        int ret;
+       struct lttng_rotate_session_return rotation_return;
 
-       /*
-        * Complete _at most_ one scheduled rotation on a stopped session.
-        */
-       if (!session->active && session->rotate_timer_enabled &&
-                       session->rotated_after_last_stop) {
-               ret = 0;
-               goto end;
-       }
+       DBG("[rotation-thread] Launching scheduled time-based rotation on session \"%s\"",
+                       session->name);
 
-       /* Ignore this timer if a rotation is already in progress. */
-       if (session->rotate_pending || session->rotate_pending_relay) {
-               ret = 0;
-               goto end;
+       ret = cmd_rotate_session(session, &rotation_return);
+       if (ret == LTTNG_OK) {
+               DBG("[rotation-thread] Scheduled time-based rotation successfully launched on session \"%s\"",
+                               session->name);
+       } else {
+               /* Don't consider errors as fatal. */
+               DBG("[rotation-thread] Scheduled time-based rotation aborted for session %s: %s",
+                               session->name, lttng_strerror(ret));
        }
+       return 0;
+}
 
-       DBG("[rotation-thread] Rotate timer on session %s", session->name);
+static
+int run_job(struct rotation_thread_job *job, struct ltt_session *session,
+               struct notification_thread_handle *notification_thread_handle)
+{
+       int ret;
 
-       ret = cmd_rotate_session(session, NULL);
-       if (ret == -LTTNG_ERR_ROTATION_PENDING) {
-               DBG("Scheduled rotation aborted since a rotation is already in progress");
-               ret = 0;
-               goto end;
-       } else if (ret != LTTNG_OK) {
-               ERR("[rotation-thread] Automatic time-triggered rotation failed with error code %i", ret);
-               ret = -1;
-               goto end;
+       switch (job->type) {
+       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+               ret = launch_session_rotation(session);
+               break;
+       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+               ret = check_session_rotation_pending(session,
+                               notification_thread_handle);
+               break;
+       default:
+               abort();
        }
-
-       ret = 0;
-
-end:
        return ret;
 }
 
 static
-int handle_rotate_timer_pipe(uint32_t revents,
-               struct rotation_thread_handle *handle,
-               struct rotation_thread_state *state,
+int handle_job_queue(struct rotation_thread_handle *handle,
+               struct rotation_thread *state,
                struct rotation_thread_timer_queue *queue)
 {
        int ret = 0;
        int fd = lttng_pipe_get_readfd(queue->event_pipe);
        struct ltt_session *session;
-       char buf[1];
+       char buf;
 
-       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-               ret = lttng_poll_del(&state->events, fd);
-               if (ret) {
-                       ERR("[rotation-thread] Failed to remove consumer "
-                                       "rotate pending pipe from poll set");
-               }
-               goto end;
-       }
-
-       ret = lttng_read(fd, buf, 1);
+       ret = lttng_read(fd, &buf, 1);
        if (ret != 1) {
                ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
                ret = -1;
@@ -572,57 +675,46 @@ int handle_rotate_timer_pipe(uint32_t revents,
        }
 
        for (;;) {
-               struct sessiond_rotation_timer *timer_data;
+               struct rotation_thread_job *job;
 
-               /*
-                * Take the queue lock only to pop elements from the list.
-                */
+               /* Take the queue lock only to pop an element from the list. */
                pthread_mutex_lock(&queue->lock);
                if (cds_list_empty(&queue->list)) {
                        pthread_mutex_unlock(&queue->lock);
                        break;
                }
-               timer_data = cds_list_first_entry(&queue->list,
-                               struct sessiond_rotation_timer, head);
-               cds_list_del(&timer_data->head);
+               job = cds_list_first_entry(&queue->list,
+                               typeof(*job), head);
+               cds_list_del(&job->head);
                pthread_mutex_unlock(&queue->lock);
 
-               /*
-                * session lock to lookup the session ID.
-                */
                session_lock_list();
-               session = session_find_by_id(timer_data->session_id);
+               session = session_find_by_id(job->session_id);
                if (!session) {
                        DBG("[rotation-thread] Session %" PRIu64 " not found",
-                                       timer_data->session_id);
+                                       job->session_id);
                        /*
-                        * This is a non-fatal error, and we cannot report it to the
-                        * user (timer), so just print the error and continue the
-                        * processing.
+                        * This is a non-fatal error, and we cannot report it to
+                        * the user (timer), so just print the error and
+                        * continue the processing.
+                        *
+                        * While the timer thread will purge pending signals for
+                        * a session on the session's destruction, it is
+                        * possible for a job targeting that session to have
+                        * already been queued before it was destroyed.
                         */
                        session_unlock_list();
-                       free(timer_data);
+                       free(job);
                        continue;
                }
 
-               /*
-                * Take the session lock and release the session_list lock.
-                */
                session_lock(session);
                session_unlock_list();
 
-               if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
-                       ret = rotate_pending_relay_timer(session);
-               } else if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
-                       ret = rotate_timer(session);
-               } else {
-                       ERR("Unknown signal in rotate timer %d", timer_data->signal);
-                       ret = -1;
-               }
+               ret = run_job(job, session, handle->notification_thread_handle);
                session_unlock(session);
-               free(timer_data);
+               free(job);
                if (ret) {
-                       ERR("Error processing timer");
                        goto end;
                }
        }
@@ -633,8 +725,8 @@ end:
        return ret;
 }
 
-int handle_condition(
-               const struct lttng_condition *condition,
+static
+int handle_condition(const struct lttng_condition *condition,
                const struct lttng_evaluation *evaluation,
                struct notification_thread_handle *notification_thread_handle)
 {
@@ -713,9 +805,9 @@ end:
 }
 
 static
-int handle_notification_channel(int fd, uint32_t revents,
+int handle_notification_channel(int fd,
                struct rotation_thread_handle *handle,
-               struct rotation_thread_state *state)
+               struct rotation_thread *state)
 {
        int ret;
        bool notification_pending;
@@ -775,7 +867,7 @@ void *thread_rotation(void *data)
 {
        int ret;
        struct rotation_thread_handle *handle = data;
-       struct rotation_thread_state state;
+       struct rotation_thread thread;
 
        DBG("[rotation-thread] Started rotation thread");
 
@@ -784,13 +876,10 @@ void *thread_rotation(void *data)
                goto end;
        }
 
-       rcu_register_thread();
-       rcu_thread_online();
-
        health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
        health_code_update();
 
-       ret = init_thread_state(handle, &state);
+       ret = init_thread_state(handle, &thread);
        if (ret) {
                goto end;
        }
@@ -803,7 +892,7 @@ void *thread_rotation(void *data)
 
                health_poll_entry();
                DBG("[rotation-thread] Entering poll wait");
-               ret = lttng_poll_wait(&state.events, -1);
+               ret = lttng_poll_wait(&thread.events, -1);
                DBG("[rotation-thread] Poll wait returned (%i)", ret);
                health_poll_exit();
                if (ret < 0) {
@@ -819,34 +908,31 @@ void *thread_rotation(void *data)
 
                fd_count = ret;
                for (i = 0; i < fd_count; i++) {
-                       int fd = LTTNG_POLL_GETFD(&state.events, i);
-                       uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
+                       int fd = LTTNG_POLL_GETFD(&thread.events, i);
+                       uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
 
                        DBG("[rotation-thread] Handling fd (%i) activity (%u)",
                                        fd, revents);
 
-                       if (fd == handle->thread_quit_pipe) {
+                       if (revents & LPOLLERR) {
+                               ERR("[rotation-thread] Polling returned an error on fd %i", fd);
+                               goto error;
+                       }
+
+                       if (fd == handle->quit_pipe) {
                                DBG("[rotation-thread] Quit pipe activity");
+                               /* TODO flush the queue. */
                                goto exit;
                        } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
-                               ret = handle_rotate_timer_pipe(revents,
-                                               handle, &state, handle->rotation_timer_queue);
+                               ret = handle_job_queue(handle, &thread,
+                                               handle->rotation_timer_queue);
                                if (ret) {
                                        ERR("[rotation-thread] Failed to handle rotation timer pipe event");
                                        goto error;
                                }
-                       } else if (fd == handle->ust32_consumer ||
-                                       fd == handle->ust64_consumer ||
-                                       fd == handle->kernel_consumer) {
-                               ret = handle_channel_rotation_pipe(fd,
-                                               revents, handle, &state);
-                               if (ret) {
-                                       ERR("[rotation-thread] Failed to handle channel rotation pipe");
-                                       goto error;
-                               }
                        } else if (fd == rotate_notification_channel->socket) {
-                               ret = handle_notification_channel(fd, revents,
-                                               handle, &state);
+                               ret = handle_notification_channel(fd, handle,
+                                               &thread);
                                if (ret) {
                                        ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
                                        goto error;
@@ -857,10 +943,8 @@ void *thread_rotation(void *data)
 exit:
 error:
        DBG("[rotation-thread] Exit");
-       fini_thread_state(&state);
+       fini_thread_state(&thread);
        health_unregister(health_sessiond);
-       rcu_thread_offline();
-       rcu_unregister_thread();
 end:
        return NULL;
 }
index 41da6e0444e209855fe14d7986834e9dd1979c99..618797f108d3d842142c55d65a4842af10ad5e06 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
 #include <pthread.h>
 #include <semaphore.h>
 #include "session.h"
+#include "notification-thread.h"
 
-/*
- * The timer thread enqueues struct sessiond_rotation_timer objects in the list
- * and wake up the rotation thread. When the rotation thread wakes up, it
- * empties the queue.
- */
-struct rotation_thread_timer_queue {
-       struct lttng_pipe *event_pipe;
-       struct cds_list_head list;
-       pthread_mutex_t lock;
-};
-
-struct rotation_thread_handle {
-       /*
-        * Read side of pipes used to communicate with the rotation thread.
-        */
-       /* Notification from the consumers */
-       int ust32_consumer;
-       int ust64_consumer;
-       int kernel_consumer;
-       /* quit pipe */
-       int thread_quit_pipe;
+extern struct lttng_notification_channel *rotate_notification_channel;
 
-       struct rotation_thread_timer_queue *rotation_timer_queue;
+enum rotation_thread_job_type {
+       ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
+       ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION
+};
 
-       /* Access to the notification thread cmd_queue */
-       struct notification_thread_handle *notification_thread_handle;
+struct rotation_thread_timer_queue;
+struct rotation_thread_handle;
 
-       sem_t *notification_thread_ready;
-};
+struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void);
+void rotation_thread_timer_queue_destroy(
+               struct rotation_thread_timer_queue *queue);
 
 struct rotation_thread_handle *rotation_thread_handle_create(
-               struct lttng_pipe *ust32_channel_rotate_pipe,
-               struct lttng_pipe *ust64_channel_rotate_pipe,
-               struct lttng_pipe *kernel_channel_rotate_pipe,
                int thread_quit_pipe,
                struct rotation_thread_timer_queue *rotation_timer_queue,
                struct notification_thread_handle *notification_thread_handle,
@@ -71,6 +54,9 @@ struct rotation_thread_handle *rotation_thread_handle_create(
 void rotation_thread_handle_destroy(
                struct rotation_thread_handle *handle);
 
+void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
+               enum rotation_thread_job_type job_type, uint64_t session_id);
+
 void *thread_rotation(void *data);
 
 #endif /* ROTATION_THREAD_H */
index a5b9a427835e8a580cb4faaf658b53247091a888..38dbf6128c31639b668910ede62fc4a51f19d7c0 100644 (file)
@@ -520,11 +520,12 @@ int session_create(char *name, uid_t uid, gid_t gid)
                goto error;
        }
 
-       new_session->rotate_pending = false;
+       new_session->rotation_pending_local = false;
+       new_session->rotation_pending_relay = false;
        new_session->rotation_state = LTTNG_ROTATION_STATE_NO_ROTATION;
-       new_session->rotate_pending_relay = false;
-       new_session->rotate_relay_pending_timer_enabled = false;
-       new_session->rotate_timer = false;
+
+       new_session->rotation_pending_check_timer_enabled = false;
+       new_session->rotation_schedule_timer_enabled = false;
 
        /* Add new session to the session list */
        session_lock_list();
index abbf9ece04a1a1b438287fda61f2f73f39a65f91..99b8c47112bd3f6d1032d7bdcf8e4439b07801ba 100644 (file)
@@ -136,24 +136,19 @@ struct ltt_session {
         */
        uint64_t current_archive_id;
        /*
-        * Rotation is pending between the time it starts until the consumer has
-        * finished extracting the data. If the session uses a relay, data related
-        * to a rotation can still be in flight after that, see
-        * rotate_pending_relay.
-        */
-       bool rotate_pending;
-       /*
-        * True until the relay has finished the rotation of all the streams.
+        * Rotation is considered pending between the time it is launched up
+        * until the moment when the data has been writen at the destination
+        * and the trace archive has been renamed.
+        *
+        * When tracing locally, only 'rotation_pending_local' is used since
+        * no remote checks are needed. However, when tracing to a relay daemon,
+        * a second check is needed to ensure that the data has been
+        * commited at the remote destination.
         */
-       bool rotate_pending_relay;
+       bool rotation_pending_local;
+       bool rotation_pending_relay;
        /* Current state of a rotation. */
        enum lttng_rotation_state rotation_state;
-       /*
-        * Number of channels waiting for a rotation.
-        * When this number reaches 0, we can handle the rename of the chunk
-        * folder and inform the client that the rotate is finished.
-        */
-       unsigned int nr_chan_rotate_pending;
        struct {
                /*
                 * When the rotation is in progress, the temporary path name is
@@ -182,14 +177,14 @@ struct ltt_session {
         */
        time_t current_chunk_start_ts;
        /*
-        * Timer to check periodically if a relay has completed the last
-        * rotation.
+        * Timer to check periodically if a relay and/or consumer has completed
+        * the last rotation.
         */
-       bool rotate_relay_pending_timer_enabled;
-       timer_t rotate_relay_pending_timer;
+       bool rotation_pending_check_timer_enabled;
+       timer_t rotation_pending_check_timer;
        /* Timer to periodically rotate a session. */
-       bool rotate_timer_enabled;
-       timer_t rotate_timer;
+       bool rotation_schedule_timer_enabled;
+       timer_t rotation_schedule_timer;
        /* Value for periodic rotations, 0 if disabled. */
        uint64_t rotate_timer_period;
        /* Value for size-based rotations, 0 if disabled. */
index 38cfdc660e144be760bcd2ddb16af09127efa1f7..b8cf4825aa4e67ed6c325a07f2cc2723ed5519a5 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
 #include "health-sessiond.h"
 #include "rotation-thread.h"
 
+#define LTTNG_SESSIOND_SIG_QS                          SIGRTMIN + 10
+#define LTTNG_SESSIOND_SIG_EXIT                                SIGRTMIN + 11
+#define LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK      SIGRTMIN + 12
+#define LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION          SIGRTMIN + 13
+
+#define UINT_TO_PTR(value)                             \
+       ({                                              \
+               assert(value <= UINTPTR_MAX);           \
+               (void *) (uintptr_t) value;             \
+       })
+#define PTR_TO_UINT(ptr) ((uintptr_t) ptr)
+
+/*
+ * Handle timer teardown race wrt memory free of private data by sessiond
+ * signals are handled by a single thread, which permits a synchronization
+ * point between handling of each signal. Internal lock ensures mutual
+ * exclusion.
+ */
 static
-struct timer_signal_data timer_signal = {
+struct timer_signal_data {
+       /* Thread managing signals. */
+       pthread_t tid;
+       int qs_done;
+       pthread_mutex_t lock;
+} timer_signal = {
        .tid = 0,
        .qs_done = 0,
        .lock = PTHREAD_MUTEX_INITIALIZER,
@@ -43,7 +67,7 @@ void setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigemptyset");
        }
-       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_TEARDOWN);
+       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_QS);
        if (ret) {
                PERROR("sigaddset teardown");
        }
@@ -51,23 +75,23 @@ void setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigaddset exit");
        }
-       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK);
        if (ret) {
-               PERROR("sigaddset switch");
+               PERROR("sigaddset pending rotation check");
        }
-       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_TIMER);
+       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION);
        if (ret) {
-               PERROR("sigaddset switch");
+               PERROR("sigaddset scheduled rotation");
        }
 }
 
 /*
- * This is the same function as consumer_timer_signal_thread_qs, when it
+ * This is the same function as timer_signal_thread_qs, when it
  * returns, it means that no timer signr is currently pending or being handled
  * by the timer thread. This cannot be called from the timer thread.
  */
 static
-void sessiond_timer_signal_thread_qs(unsigned int signr)
+void timer_signal_thread_qs(unsigned int signr)
 {
        sigset_t pending_set;
        int ret;
@@ -104,10 +128,10 @@ void sessiond_timer_signal_thread_qs(unsigned int signr)
        cmm_smp_mb();
 
        /*
-        * Kill with LTTNG_SESSIOND_SIG_TEARDOWN, so signal management thread
+        * Kill with LTTNG_SESSIOND_SIG_QS, so signal management thread
         * wakes up.
         */
-       kill(getpid(), LTTNG_SESSIOND_SIG_TEARDOWN);
+       kill(getpid(), LTTNG_SESSIOND_SIG_QS);
 
        while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
                caa_cpu_relax();
@@ -125,19 +149,17 @@ void sessiond_timer_signal_thread_qs(unsigned int signr)
  * a positive value if no timer was created (not an error).
  */
 static
-int session_timer_start(timer_t *timer_id, struct ltt_session *session,
+int timer_start(timer_t *timer_id, uint64_t session_id,
                unsigned int timer_interval_us, int signal, bool one_shot)
 {
        int ret = 0, delete_ret;
        struct sigevent sev;
        struct itimerspec its;
 
-       assert(session);
-
        sev.sigev_notify = SIGEV_SIGNAL;
        sev.sigev_signo = signal;
-       sev.sigev_value.sival_ptr = session;
-       ret = timer_create(CLOCKID, &sev, timer_id);
+       sev.sigev_value.sival_ptr = UINT_TO_PTR(session_id);
+       ret = timer_create(CLOCK_MONOTONIC, &sev, timer_id);
        if (ret == -1) {
                PERROR("timer_create");
                goto end;
@@ -171,7 +193,7 @@ end:
 }
 
 static
-int session_timer_stop(timer_t *timer_id, int signal)
+int timer_stop(timer_t *timer_id, int signal)
 {
        int ret = 0;
 
@@ -181,98 +203,104 @@ int session_timer_stop(timer_t *timer_id, int signal)
                goto end;
        }
 
-       sessiond_timer_signal_thread_qs(signal);
+       timer_signal_thread_qs(signal);
        *timer_id = 0;
 end:
        return ret;
 }
 
-int sessiond_timer_rotate_pending_start(struct ltt_session *session,
+int timer_session_rotation_pending_check_start(struct ltt_session *session,
                unsigned int interval_us)
 {
        int ret;
 
-       DBG("Enabling rotate pending timer on session %" PRIu64, session->id);
+       DBG("Enabling session rotation pending check timer on session %" PRIu64,
+                       session->id);
        /*
         * We arm this timer in a one-shot mode so we don't have to disable it
-        * explicitly (which could deadlock if the timer thread is blocked writing
-        * in the rotation_timer_pipe).
+        * explicitly (which could deadlock if the timer thread is blocked
+        * writing in the rotation_timer_pipe).
+        *
         * Instead, we re-arm it if needed after the rotation_pending check as
-        * returned. Also, this timer is usually only needed once, so there is no
-        * need to go through the whole signal teardown scheme everytime.
+        * returned. Also, this timer is usually only needed once, so there is
+        * no need to go through the whole signal teardown scheme everytime.
         */
-       ret = session_timer_start(&session->rotate_relay_pending_timer,
-                       session, interval_us,
-                       LTTNG_SESSIOND_SIG_ROTATE_PENDING,
+       ret = timer_start(&session->rotation_pending_check_timer,
+                       session->id, interval_us,
+                       LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK,
                        /* one-shot */ true);
        if (ret == 0) {
-               session->rotate_relay_pending_timer_enabled = true;
+               session->rotation_pending_check_timer_enabled = true;
        }
 
        return ret;
 }
 
 /*
- * Stop and delete the channel's live timer.
- * Called with session and session_list locks held.
+ * Call with session and session_list locks held.
  */
-int sessiond_timer_rotate_pending_stop(struct ltt_session *session)
+int timer_session_rotation_pending_check_stop(struct ltt_session *session)
 {
        int ret;
 
        assert(session);
 
-       DBG("Disabling timer rotate pending on session %" PRIu64, session->id);
-       ret = session_timer_stop(&session->rotate_relay_pending_timer,
-                       LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+       DBG("Disabling session rotation pending check timer on session %" PRIu64,
+                       session->id);
+       ret = timer_stop(&session->rotation_pending_check_timer,
+                       LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK);
        if (ret == -1) {
-               ERR("Failed to stop rotate_pending timer");
+               ERR("Failed to stop rotate_pending_check timer");
        } else {
-               session->rotate_relay_pending_timer_enabled = false;
+               session->rotation_pending_check_timer_enabled = false;
        }
        return ret;
 }
 
-int sessiond_rotate_timer_start(struct ltt_session *session,
+/*
+ * Call with session and session_list locks held.
+ */
+int timer_session_rotation_schedule_timer_start(struct ltt_session *session,
                unsigned int interval_us)
 {
        int ret;
 
-       DBG("Enabling rotation timer on session \"%s\" (%ui µs)", session->name,
+       DBG("Enabling scheduled rotation timer on session \"%s\" (%ui µs)", session->name,
                        interval_us);
-       ret = session_timer_start(&session->rotate_timer, session, interval_us,
-                       LTTNG_SESSIOND_SIG_ROTATE_TIMER, false);
+       ret = timer_start(&session->rotation_schedule_timer, session->id,
+                       interval_us, LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION,
+                       /* one-shot */ false);
        if (ret < 0) {
                goto end;
        }
-       session->rotate_timer_enabled = true;
+       session->rotation_schedule_timer_enabled = true;
 end:
        return ret;
 }
 
 /*
- * Stop and delete the channel's live timer.
+ * Call with session and session_list locks held.
  */
-int sessiond_rotate_timer_stop(struct ltt_session *session)
+int timer_session_rotation_schedule_timer_stop(struct ltt_session *session)
 {
        int ret = 0;
 
        assert(session);
 
-       if (!session->rotate_timer_enabled) {
+       if (!session->rotation_schedule_timer_enabled) {
                goto end;
        }
 
-       DBG("Disabling rotation timer on session %s", session->name);
-       ret = session_timer_stop(&session->rotate_timer,
-                       LTTNG_SESSIOND_SIG_ROTATE_TIMER);
+       DBG("Disabling scheduled rotation timer on session %s", session->name);
+       ret = timer_stop(&session->rotation_schedule_timer,
+                       LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION);
        if (ret < 0) {
-               ERR("Failed to stop rotate timer of session \"%s\"",
+               ERR("Failed to stop scheduled rotation timer of session \"%s\"",
                                session->name);
                goto end;
        }
 
-       session->rotate_timer_enabled = false;
+       session->rotation_schedule_timer_enabled = false;
        ret = 0;
 end:
        return ret;
@@ -282,7 +310,7 @@ end:
  * Block the RT signals for the entire process. It must be called from the
  * sessiond main before creating the threads
  */
-int sessiond_timer_signal_init(void)
+int timer_signal_init(void)
 {
        int ret;
        sigset_t mask;
@@ -298,132 +326,10 @@ int sessiond_timer_signal_init(void)
        return 0;
 }
 
-/*
- * Called with the rotation_timer_queue lock held.
- * Return true if the same timer job already exists in the queue, false if not.
- */
-static
-bool check_duplicate_timer_job(struct timer_thread_parameters *ctx,
-               struct ltt_session *session, unsigned int signal)
-{
-       bool ret = false;
-       struct sessiond_rotation_timer *node;
-
-       rcu_read_lock();
-       cds_list_for_each_entry(node, &ctx->rotation_timer_queue->list, head) {
-               if (node->session_id == session->id && node->signal == signal) {
-                       ret = true;
-                       goto end;
-               }
-       }
-
-end:
-       rcu_read_unlock();
-       return ret;
-}
-
-/*
- * Add the session ID and signal value to the rotation_timer_queue if it is
- * not already there and wakeup the rotation thread. The rotation thread
- * empties the whole queue everytime it is woken up. The event_pipe is
- * non-blocking, if it would block, we just return because we know the
- * rotation thread will be awaken anyway.
- */
-static
-int enqueue_timer_rotate_job(struct timer_thread_parameters *ctx,
-               struct ltt_session *session, unsigned int signal)
-{
-       int ret;
-       char *c = "!";
-       struct sessiond_rotation_timer *timer_data = NULL;
-
-       pthread_mutex_lock(&ctx->rotation_timer_queue->lock);
-       if (check_duplicate_timer_job(ctx, session, signal)) {
-               /*
-                * This timer job is already pending, we don't need to add
-                * it.
-                */
-               ret = 0;
-               goto end;
-       }
-
-       timer_data = zmalloc(sizeof(struct sessiond_rotation_timer));
-       if (!timer_data) {
-               PERROR("Allocation of timer data");
-               ret = -1;
-               goto end;
-       }
-       timer_data->session_id = session->id;
-       timer_data->signal = signal;
-       cds_list_add_tail(&timer_data->head,
-                       &ctx->rotation_timer_queue->list);
-
-       ret = lttng_write(
-                       lttng_pipe_get_writefd(ctx->rotation_timer_queue->event_pipe),
-                       c, 1);
-       if (ret < 0) {
-               /*
-                * We do not want to block in the timer handler, the job has been
-                * enqueued in the list, the wakeup pipe is probably full, the job
-                * will be processed when the rotation_thread catches up.
-                */
-               if (errno == EAGAIN || errno == EWOULDBLOCK) {
-                       ret = 0;
-                       goto end;
-               }
-               PERROR("Timer wakeup rotation thread");
-               goto end;
-       }
-
-       ret = 0;
-
-end:
-       pthread_mutex_unlock(&ctx->rotation_timer_queue->lock);
-       return ret;
-}
-
-/*
- * Ask the rotation thread to check if the last rotation started in this
- * session is still pending on the relay.
- */
-static
-void relay_rotation_pending_timer(struct timer_thread_parameters *ctx,
-               int sig, siginfo_t *si)
-{
-       struct ltt_session *session = si->si_value.sival_ptr;
-
-       assert(session);
-
-       (void) enqueue_timer_rotate_job(ctx, session,
-                       LTTNG_SESSIOND_SIG_ROTATE_PENDING);
-}
-
-/*
- * Handle the LTTNG_SESSIOND_SIG_ROTATE_TIMER timer. Add the session ID to
- * the rotation_timer_queue so the rotation thread can trigger a new rotation
- * on that session.
- */
-static
-void rotate_timer(struct timer_thread_parameters *ctx, int sig, siginfo_t *si)
-{
-       int ret;
-       /*
-        * The session cannot be freed/destroyed while we are running this
-        * signal handler.
-        */
-       struct ltt_session *session = si->si_value.sival_ptr;
-       assert(session);
-
-       ret = enqueue_timer_rotate_job(ctx, session, LTTNG_SESSIOND_SIG_ROTATE_TIMER);
-       if (ret) {
-               PERROR("wakeup rotate pipe");
-       }
-}
-
 /*
  * This thread is the sighandler for the timer signals.
  */
-void *sessiond_timer_thread(void *data)
+void *timer_thread_func(void *data)
 {
        int signr;
        sigset_t mask;
@@ -434,7 +340,6 @@ void *sessiond_timer_thread(void *data)
        rcu_thread_online();
 
        health_register(health_sessiond, HEALTH_SESSIOND_TYPE_TIMER);
-
        health_code_update();
 
        /* Only self thread will receive signal mask. */
@@ -458,17 +363,20 @@ void *sessiond_timer_thread(void *data)
                                PERROR("sigwaitinfo");
                        }
                        continue;
-               } else if (signr == LTTNG_SESSIOND_SIG_TEARDOWN) {
+               } else if (signr == LTTNG_SESSIOND_SIG_QS) {
                        cmm_smp_mb();
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
                        cmm_smp_mb();
-                       DBG("Signal timer metadata thread teardown");
                } else if (signr == LTTNG_SESSIOND_SIG_EXIT) {
                        goto end;
-               } else if (signr == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
-                       relay_rotation_pending_timer(ctx, info.si_signo, &info);
-               } else if (signr == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
-                       rotate_timer(ctx, info.si_signo, &info);
+               } else if (signr == LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK) {
+                       rotation_thread_enqueue_job(ctx->rotation_thread_job_queue,
+                                       ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION,
+                                       /* session_id */ PTR_TO_UINT(info.si_value.sival_ptr));
+               } else if (signr == LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION) {
+                       rotation_thread_enqueue_job(ctx->rotation_thread_job_queue,
+                                       ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
+                                       /* session_id */ PTR_TO_UINT(info.si_value.sival_ptr));
                } else {
                        ERR("Unexpected signal %d\n", info.si_signo);
                }
@@ -481,3 +389,8 @@ end:
        rcu_unregister_thread();
        return NULL;
 }
+
+void timer_exit(void)
+{
+       kill(getpid(), LTTNG_SESSIOND_SIG_EXIT);
+}
index 523d87852c53abd935e40918bb12240d530ddd3c..83be4873ce3381942b0e69ea878331dbbe088109 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License, version 2 only,
 
 #include "session.h"
 
-#define LTTNG_SESSIOND_SIG_TEARDOWN            SIGRTMIN + 10
-#define LTTNG_SESSIOND_SIG_EXIT                        SIGRTMIN + 11
-#define LTTNG_SESSIOND_SIG_ROTATE_PENDING      SIGRTMIN + 12
-#define LTTNG_SESSIOND_SIG_ROTATE_TIMER                SIGRTMIN + 13
-
-#define CLOCKID CLOCK_MONOTONIC
-
-/*
- * Handle timer teardown race wrt memory free of private data by sessiond
- * signals are handled by a single thread, which permits a synchronization
- * point between handling of each signal. Internal lock ensures mutual
- * exclusion.
- */
-struct timer_signal_data {
-       /* Thread managing signals. */
-       pthread_t tid;
-       int qs_done;
-       pthread_mutex_t lock;
-};
-
 struct timer_thread_parameters {
-       struct rotation_thread_timer_queue *rotation_timer_queue;
+       struct rotation_thread_timer_queue *rotation_thread_job_queue;
 };
 
-struct sessiond_rotation_timer {
-       uint64_t session_id;
-       unsigned int signal;
-       /* List member in struct rotation_thread_timer_queue. */
-       struct cds_list_head head;
-};
+int timer_signal_init(void);
+void *timer_thread_func(void *data);
 
-void *sessiond_timer_thread(void *data);
-int sessiond_timer_signal_init(void);
+void timer_exit(void);
 
-int sessiond_timer_rotate_pending_start(struct ltt_session *session,
+/* Start a session's rotation pending check timer (one-shot mode). */
+int timer_session_rotation_pending_check_start(struct ltt_session *session,
                unsigned int interval_us);
-int sessiond_timer_rotate_pending_stop(struct ltt_session *session);
+/* Stop a session's rotation pending check timer. */
+int timer_session_rotation_pending_check_stop(struct ltt_session *session);
 
-int sessiond_rotate_timer_start(struct ltt_session *session,
+/* Start a session's rotation schedule timer. */
+int timer_session_rotation_schedule_timer_start(struct ltt_session *session,
                unsigned int interval_us);
-int sessiond_rotate_timer_stop(struct ltt_session *session);
+/* Stop a session's rotation schedule timer. */
+int timer_session_rotation_schedule_timer_stop(struct ltt_session *session);
 
 #endif /* SESSIOND_TIMER_H */
index 52d1da787f8142cd7cf92bab17acd9d3eda11235..2a810aba15fd2db25f4c47bf5d98bf3a70b0d607 100644 (file)
@@ -6328,7 +6328,7 @@ int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess)
  *
  * Return 0 on success or else a negative value.
  */
-int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
+int ust_app_rotate_session(struct ltt_session *session)
 {
        int ret = 0;
        struct lttng_ht_iter iter;
@@ -6357,22 +6357,6 @@ int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
                                goto error;
                        }
 
-                       /*
-                        * Account the metadata channel first to make sure the
-                        * number of channels waiting for a rotation cannot
-                        * reach 0 before we complete the iteration over all
-                        * the channels.
-                        */
-                       ret = rotate_add_channel_pending(
-                                       reg->registry->reg.ust->metadata_key,
-                                       LTTNG_DOMAIN_UST, session);
-                       if (ret < 0) {
-                               ret = reg->bits_per_long == 32 ?
-                                               -LTTNG_ERR_UST_CONSUMER32_FAIL :
-                                               -LTTNG_ERR_UST_CONSUMER64_FAIL;
-                               goto error;
-                       }
-
                        ret = snprintf(pathname, sizeof(pathname),
                                        DEFAULT_UST_TRACE_DIR "/" DEFAULT_UST_TRACE_UID_PATH,
                                        reg->uid, reg->bits_per_long);
@@ -6384,22 +6368,12 @@ int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
                        /* Rotate the data channels. */
                        cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
                                        reg_chan, node.node) {
-                               ret = rotate_add_channel_pending(
-                                               reg_chan->consumer_key,
-                                               LTTNG_DOMAIN_UST, session);
-                               if (ret < 0) {
-                                       ret = reg->bits_per_long == 32 ?
-                                                       -LTTNG_ERR_UST_CONSUMER32_FAIL :
-                                                       -LTTNG_ERR_UST_CONSUMER64_FAIL;
-                                       goto error;
-                               }
                                ret = consumer_rotate_channel(socket,
                                                reg_chan->consumer_key,
                                                usess->uid, usess->gid,
                                                usess->consumer, pathname,
                                                /* is_metadata_channel */ false,
-                                               session->current_archive_id,
-                                               &session->rotate_pending_relay);
+                                               session->current_archive_id);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -6412,12 +6386,10 @@ int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
                                        usess->uid, usess->gid,
                                        usess->consumer, pathname,
                                        /* is_metadata_channel */ true,
-                                       session->current_archive_id,
-                                       &session->rotate_pending_relay);
+                                       session->current_archive_id);
                        if (ret < 0) {
                                goto error;
                        }
-                       *ust_active = true;
                }
                break;
        }
@@ -6458,39 +6430,15 @@ int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
                                goto error;
                        }
 
-                       /*
-                        * Account the metadata channel first to make sure the
-                        * number of channels waiting for a rotation cannot
-                        * reach 0 before we complete the iteration over all
-                        * the channels.
-                        */
-                       ret = rotate_add_channel_pending(registry->metadata_key,
-                                       LTTNG_DOMAIN_UST, session);
-                       if (ret < 0) {
-                               ret = app->bits_per_long == 32 ?
-                                               -LTTNG_ERR_UST_CONSUMER32_FAIL :
-                                               -LTTNG_ERR_UST_CONSUMER64_FAIL;
-                               goto error;
-                       }
 
                        /* Rotate the data channels. */
                        cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
                                        ua_chan, node.node) {
-                               ret = rotate_add_channel_pending(
-                                               ua_chan->key, LTTNG_DOMAIN_UST,
-                                               session);
-                               if (ret < 0) {
-                                       ret = app->bits_per_long == 32 ?
-                                                       -LTTNG_ERR_UST_CONSUMER32_FAIL :
-                                                       -LTTNG_ERR_UST_CONSUMER64_FAIL;
-                                       goto error;
-                               }
                                ret = consumer_rotate_channel(socket, ua_chan->key,
                                                ua_sess->euid, ua_sess->egid,
                                                ua_sess->consumer, pathname,
                                                /* is_metadata_channel */ false,
-                                               session->current_archive_id,
-                                               &session->rotate_pending_relay);
+                                               session->current_archive_id);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -6502,12 +6450,10 @@ int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
                                        ua_sess->euid, ua_sess->egid,
                                        ua_sess->consumer, pathname,
                                        /* is_metadata_channel */ true,
-                                       session->current_archive_id,
-                                       &session->rotate_pending_relay);
+                                       session->current_archive_id);
                        if (ret < 0) {
                                goto error;
                        }
-                       *ust_active = true;
                }
                break;
        }
index 7cc0d10aad3eeea499a23da90eefee78374b7a29..5aef9e9519879cfa463188b42d94687d23462010 100644 (file)
@@ -360,7 +360,7 @@ int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
                struct consumer_output *consumer,
                int overwrite, uint64_t *discarded, uint64_t *lost);
 int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess);
-int ust_app_rotate_session(struct ltt_session *session, bool *ust_active);
+int ust_app_rotate_session(struct ltt_session *session);
 
 static inline
 int ust_app_supported(void)
@@ -595,7 +595,7 @@ int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess)
 }
 
 static inline
-int ust_app_rotate_session(struct ltt_session *session, bool *ust_active)
+int ust_app_rotate_session(struct ltt_session *session)
 {
        return 0;
 }
index 27e96007c920a94ef94aaae8fefa21d7aa771ab8..6de72e2758b5086f35c36e96f69276eaa8dfbdf0 100644 (file)
@@ -2283,26 +2283,6 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_unlock();
 }
 
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
-               uint64_t key)
-{
-       ssize_t ret;
-
-       do {
-               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret == -1) {
-               PERROR("Failed to write to the channel rotation pipe");
-       } else {
-               DBG("Sent channel rotation notification for channel key %"
-                               PRIu64, key);
-               ret = 0;
-       }
-
-       return (int) ret;
-}
-
 /*
  * Perform operations that need to be done after a stream has
  * rotated and released the stream lock.
@@ -2339,13 +2319,7 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream,
                        abort();
        }
 
-       if (--stream->chan->nr_stream_rotate_pending == 0) {
-               DBG("Rotation of channel \"%s\" completed, notifying the session daemon",
-                               stream->chan->name);
-               ret = rotate_notify_sessiond(ctx, stream->chan->key);
-       }
        pthread_mutex_unlock(&stream->chan->lock);
-
        return ret;
 }
 
@@ -4209,8 +4183,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
        } else {
                ret = rotate_local_stream(ctx, stream);
        }
+       stream->trace_archive_id++;
        if (ret < 0) {
-               ERR("Rotate stream");
+               ERR("Failed to rotate stream, ret = %i", ret);
                goto error;
        }
 
@@ -4375,7 +4350,64 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path,
        }
 }
 
-int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+/* Stream lock must be acquired by the caller. */
+static
+bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream,
+               uint64_t session_id, uint64_t chunk_id)
+{
+       bool pending = false;
+
+       if (stream->session_id != session_id) {
+               /* Skip. */
+               goto end;
+       }
+
+       /*
+        * If the stream's archive_id belongs to the chunk being rotated (or an
+        * even older one), it means that the consumer has not consumed all the
+        * buffers that belong to the chunk being rotated. Therefore, the
+        * rotation is considered as ongoing/pending.
+        */
+       pending = stream->trace_archive_id <= chunk_id;
+end:
+       return pending;
+}
+
+/* RCU read lock must be acquired by the caller. */
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+               uint64_t chunk_id)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       bool rotation_pending = false;
+
+       /* Start with the metadata streams... */
+       cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+               pthread_mutex_lock(&stream->lock);
+               rotation_pending = check_stream_rotation_pending(stream,
+                               session_id, chunk_id);
+               pthread_mutex_unlock(&stream->lock);
+               if (rotation_pending) {
+                       goto end;
+               }
+       }
+
+       /* ... followed by the data streams. */
+       cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+               pthread_mutex_lock(&stream->lock);
+               rotation_pending = check_stream_rotation_pending(stream,
+                               session_id, chunk_id);
+               pthread_mutex_unlock(&stream->lock);
+               if (rotation_pending) {
+                       goto end;
+               }
+       }
+
+end:
+       return !!rotation_pending;
+}
+
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
                uint64_t relayd_id, uint64_t chunk_id)
 {
        int ret;
@@ -4383,7 +4415,7 @@ int lttng_consumer_rotate_pending_relay(uint64_t session_id,
 
        relayd = consumer_find_relayd(relayd_id);
        if (!relayd) {
-               ERR("Failed to find relayd");
+               ERR("Failed to find relayd id %" PRIu64, relayd_id);
                ret = -1;
                goto end;
        }
index 4cc40a431ee7ba86b0a54913aa73b43bdc3216f4..d4e9f066a4b8d15bebe4a2270829eb3a53c91907 100644 (file)
@@ -2,6 +2,7 @@
  * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
  *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *               2012 - David Goulet <dgoulet@efficios.com>
+ *               2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License, version 2 only,
@@ -62,10 +63,10 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_LOST_PACKETS,
        LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
        LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE,
-       LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE,
        LTTNG_CONSUMER_ROTATE_CHANNEL,
        LTTNG_CONSUMER_ROTATE_RENAME,
-       LTTNG_CONSUMER_ROTATE_PENDING_RELAY,
+       LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL,
+       LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY,
        LTTNG_CONSUMER_MKDIR,
 };
 
@@ -417,8 +418,9 @@ struct lttng_consumer_stream {
        /* Copy of the sequence number of the last packet extracted. */
        uint64_t last_sequence_number;
        /*
-        * Session's current trace archive id at the time of the creation of
-        * this stream.
+        * A stream is created with a trace_archive_id matching the session's
+        * current trace archive id at the time of the creation of the stream.
+        * It is incremented when the rotate_position is reached.
         */
        uint64_t trace_archive_id;
        /*
@@ -602,11 +604,6 @@ struct lttng_consumer_local_data {
         * to the session daemon (write-only).
         */
        int channel_monitor_pipe;
-       /*
-        * Pipe used to inform the session daemon that a stream has finished
-        * its rotation (write-only).
-        */
-       int channel_rotate_pipe;
 };
 
 /*
@@ -674,6 +671,24 @@ extern int consumer_quit;
 /* Flag used to temporarily pause data consumption from testpoints. */
 extern int data_consumption_paused;
 
+/* Return a human-readable consumer type string that is suitable for logging. */
+static inline
+const char *lttng_consumer_type_str(enum lttng_consumer_type type)
+{
+       switch (type) {
+       case LTTNG_CONSUMER_UNKNOWN:
+               return "unknown";
+       case LTTNG_CONSUMER_KERNEL:
+               return "kernel";
+       case LTTNG_CONSUMER32_UST:
+               return "32-bit user space";
+       case LTTNG_CONSUMER64_UST:
+               return "64-bit user space";
+       default:
+               abort();
+       }
+}
+
 /*
  * Init consumer data structures.
  */
@@ -836,7 +851,9 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
                struct lttng_consumer_local_data *ctx);
 int lttng_consumer_rotate_rename(const char *current_path, const char *new_path,
                uid_t uid, gid_t gid, uint64_t relayd_id);
-int lttng_consumer_rotate_pending_relay( uint64_t session_id,
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+               uint64_t chunk_id);
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
                uint64_t relayd_id, uint64_t chunk_id);
 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
 int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
index e19881f6b2b11cf051272a0e35f984976509e8ca..5c93e498ddf39c86007c8b07d58ec8e64ec8324f 100644 (file)
  * Default timer value in usec for the rotate pending polling check on the
  * relay when a rotation has completed on the consumer.
  */
-#define DEFAULT_ROTATE_PENDING_RELAY_TIMER     CONFIG_DEFAULT_ROTATE_PENDING_RELAY_TIMER
+#define DEFAULT_ROTATE_PENDING_TIMER   CONFIG_DEFAULT_ROTATE_PENDING_TIMER
 
 /*
  * Returns the default subbuf size.
index 3455f827b6b715c5415bcb3ee11c0d6147a46972..c223fa395af172525985e92bdaf8b39058647f6d 100644 (file)
@@ -1082,47 +1082,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
-       case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
-       {
-               int channel_rotate_pipe;
-               int flags;
-
-               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-               /* Successfully received the command's type. */
-               ret = consumer_send_status_msg(sock, ret_code);
-               if (ret < 0) {
-                       goto error_fatal;
-               }
-
-               ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1);
-               if (ret != (ssize_t) sizeof(channel_rotate_pipe)) {
-                       ERR("Failed to receive channel rotate pipe");
-                       goto error_fatal;
-               }
-
-               DBG("Received channel rotate pipe (%d)", channel_rotate_pipe);
-               ctx->channel_rotate_pipe = channel_rotate_pipe;
-               /* Set the pipe as non-blocking. */
-               ret = fcntl(channel_rotate_pipe, F_GETFL, 0);
-               if (ret == -1) {
-                       PERROR("fcntl get flags of the channel rotate pipe");
-                       goto error_fatal;
-               }
-               flags = ret;
-
-               ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK);
-               if (ret == -1) {
-                       PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe");
-                       goto error_fatal;
-               }
-               DBG("Channel rotate pipe set as non-blocking");
-               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-               ret = consumer_send_status_msg(sock, ret_code);
-               if (ret < 0) {
-                       goto error_fatal;
-               }
-               break;
-       }
        case LTTNG_CONSUMER_ROTATE_CHANNEL:
        {
                DBG("Consumer rotate channel %" PRIu64, msg.u.rotate_channel.key);
@@ -1183,19 +1142,18 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
-       case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+       case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL:
        {
                int pending;
                uint32_t pending_reply;
 
-               DBG("Consumer rotate pending on relay for session %" PRIu64,
-                               msg.u.rotate_pending_relay.session_id);
-               pending = lttng_consumer_rotate_pending_relay(
-                               msg.u.rotate_pending_relay.session_id,
-                               msg.u.rotate_pending_relay.relayd_id,
-                               msg.u.rotate_pending_relay.chunk_id);
+               DBG("Perform local check of pending rotation for session id %" PRIu64,
+                               msg.u.check_rotation_pending_local.session_id);
+               pending = lttng_consumer_check_rotation_pending_local(
+                               msg.u.check_rotation_pending_local.session_id,
+                               msg.u.check_rotation_pending_local.chunk_id);
                if (pending < 0) {
-                       ERR("Rotate pending relay failed");
+                       ERR("Local rotation pending check failed with code %i", pending);
                        ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
                } else {
                        pending_reply = !!pending;
@@ -1222,7 +1180,51 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = lttcomm_send_unix_sock(sock, &pending_reply,
                                sizeof(pending_reply));
                if (ret < 0) {
-                       PERROR("send data pending ret code");
+                       PERROR("Failed to send rotation pending return code");
+                       goto error_fatal;
+               }
+               break;
+       }
+       case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY:
+       {
+               int pending;
+               uint32_t pending_reply;
+
+               DBG("Perform relayd check of pending rotation for session id %" PRIu64,
+                               msg.u.check_rotation_pending_relay.session_id);
+               pending = lttng_consumer_check_rotation_pending_relay(
+                               msg.u.check_rotation_pending_relay.session_id,
+                               msg.u.check_rotation_pending_relay.relayd_id,
+                               msg.u.check_rotation_pending_relay.chunk_id);
+               if (pending < 0) {
+                       ERR("Relayd rotation pending check failed with code %i", pending);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               } else {
+                       pending_reply = !!pending;
+               }
+
+               health_code_update();
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               if (pending < 0) {
+                       /*
+                        * An error occured while running the command;
+                        * don't send the 'pending' flag as the sessiond
+                        * will not read it.
+                        */
+                       break;
+               }
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &pending_reply,
+                               sizeof(pending_reply));
+               if (ret < 0) {
+                       PERROR("Failed to send rotation pending return code");
                        goto error_fatal;
                }
                break;
index 3a2dd4946423aabb2a4299306eebae5ae5ab403b..72aa61e0beab45f16e97dff3da2093c1bb12d687 100644 (file)
@@ -437,7 +437,7 @@ struct lttcomm_lttng_output_id {
  * operation.
  */
 struct lttcomm_consumer_msg {
-       uint32_t cmd_type;      /* enum consumerd_command */
+       uint32_t cmd_type;      /* enum lttng_consumer_command */
        union {
                struct {
                        uint64_t channel_key;
@@ -602,11 +602,15 @@ struct lttcomm_consumer_msg {
                        uint32_t uid;
                        uint32_t gid;
                } LTTNG_PACKED rotate_rename;
+               struct {
+                       uint64_t session_id;
+                       uint64_t chunk_id;
+               } LTTNG_PACKED check_rotation_pending_local;
                struct {
                        uint64_t relayd_id;
                        uint64_t session_id;
                        uint64_t chunk_id;
-               } LTTNG_PACKED rotate_pending_relay;
+               } LTTNG_PACKED check_rotation_pending_relay;
                struct {
                        char path[LTTNG_PATH_MAX];
                        uint64_t relayd_id; /* Relayd id if apply. */
index 22efc3956656136ff3f5465a17b3bc68a1f782a2..a81a497a9f8a8fa042ba28f29bbf0e86e73dc92e 100644 (file)
@@ -1929,47 +1929,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                goto end_msg_sessiond;
        }
-       case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
-       {
-               int channel_rotate_pipe;
-               int flags;
-
-               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-               /* Successfully received the command's type. */
-               ret = consumer_send_status_msg(sock, ret_code);
-               if (ret < 0) {
-                       goto error_fatal;
-               }
-
-               ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1);
-               if (ret != sizeof(channel_rotate_pipe)) {
-                       ERR("Failed to receive channel rotate pipe");
-                       goto error_fatal;
-               }
-
-               DBG("Received channel rotate pipe (%d)", channel_rotate_pipe);
-               ctx->channel_rotate_pipe = channel_rotate_pipe;
-               /* Set the pipe as non-blocking. */
-               ret = fcntl(channel_rotate_pipe, F_GETFL, 0);
-               if (ret == -1) {
-                       PERROR("fcntl get flags of the channel rotate pipe");
-                       goto error_fatal;
-               }
-               flags = ret;
-
-               ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK);
-               if (ret == -1) {
-                       PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe");
-                       goto error_fatal;
-               }
-               DBG("Channel rotate pipe set as non-blocking");
-               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-               ret = consumer_send_status_msg(sock, ret_code);
-               if (ret < 0) {
-                       goto error_fatal;
-               }
-               break;
-       }
        case LTTNG_CONSUMER_ROTATE_CHANNEL:
        {
                /*
@@ -2031,27 +1990,25 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
-       case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+       case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL:
        {
                int pending;
                uint32_t pending_reply;
 
-               DBG("Consumer rotate pending on relay for session %" PRIu64,
-                               msg.u.rotate_pending_relay.session_id);
-               pending = lttng_consumer_rotate_pending_relay(
-                               msg.u.rotate_pending_relay.session_id,
-                               msg.u.rotate_pending_relay.relayd_id,
-                               msg.u.rotate_pending_relay.chunk_id);
+               DBG("Perform local check of pending rotation for session id %" PRIu64,
+                               msg.u.check_rotation_pending_local.session_id);
+               pending = lttng_consumer_check_rotation_pending_local(
+                               msg.u.check_rotation_pending_local.session_id,
+                               msg.u.check_rotation_pending_local.chunk_id);
                if (pending < 0) {
-                       ERR("Rotate pending relay failed");
-                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+                       ERR("Local rotation pending check failed with code %i", pending);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
                } else {
                        pending_reply = !!pending;
                }
 
                health_code_update();
 
-               /* Send whether the command was successful. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -2061,17 +2018,61 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (pending < 0) {
                        /*
                         * An error occured while running the command;
-                        * don't send the 'pending' reply as the sessiond
+                        * don't send the 'pending' flag as the sessiond
                         * will not read it.
                         */
                        break;
                }
 
-               /* Send back the command's payload (pending reply). */
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &pending_reply,
+                               sizeof(pending_reply));
+               if (ret < 0) {
+                       PERROR("Failed to send rotation pending return code");
+                       goto error_fatal;
+               }
+               break;
+       }
+       case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY:
+       {
+               int pending;
+               uint32_t pending_reply;
+
+               DBG("Perform relayd check of pending rotation for session id %" PRIu64,
+                               msg.u.check_rotation_pending_relay.session_id);
+               pending = lttng_consumer_check_rotation_pending_relay(
+                               msg.u.check_rotation_pending_relay.session_id,
+                               msg.u.check_rotation_pending_relay.relayd_id,
+                               msg.u.check_rotation_pending_relay.chunk_id);
+               if (pending < 0) {
+                       ERR("Relayd rotation pending check failed with code %i", pending);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               } else {
+                       pending_reply = !!pending;
+               }
+
+               health_code_update();
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               if (pending < 0) {
+                       /*
+                        * An error occured while running the command;
+                        * don't send the 'pending' flag as the sessiond
+                        * will not read it.
+                        */
+                       break;
+               }
+
+               /* Send back returned value to session daemon */
                ret = lttcomm_send_unix_sock(sock, &pending_reply,
                                sizeof(pending_reply));
                if (ret < 0) {
-                       PERROR("send data pending ret code");
+                       PERROR("Failed to send rotation pending return code");
                        goto error_fatal;
                }
                break;
index f0acb2f2220cc623ecf8ec49339f59cfd5f5186a..cea322d32ddaf3a8f6bd07eb695772653758266a 100644 (file)
@@ -63,19 +63,6 @@ static char random_string[RANDOM_STRING_LEN];
 static struct ltt_ust_session *usess;
 static struct lttng_domain dom;
 
-/*
- * Stub to prevent an undefined reference in this test without having to link
- * the entire tree because of a cascade of dependencies. This is not used,
- * it is just there to prevent GCC from complaining.
- */
-int rotate_add_channel_pending(uint64_t key, enum lttng_domain_type domain,
-               struct ltt_session *session)
-{
-       ERR("Stub called instead of the real function");
-       abort();
-       return -1;
-}
-
 /*
  * Return random string of 10 characters.
  * Not thread-safe.
This page took 0.136199 seconds and 5 git commands to generate.