+ struct consumer_socket *socket;
+ struct cds_lfht_iter iter;
+ enum consumer_trace_chunk_exists_status exists_status;
+ uint64_t relayd_id;
+ bool chunk_exists_on_peer = false;
+ enum lttng_trace_chunk_status chunk_status;
+
+ assert(session->chunk_being_archived);
+
+ /*
+ * Check for a local pending rotation on all consumers (32-bit
+ * user space, 64-bit user space, and kernel).
+ */
+ rcu_read_lock();
+ if (!session->ust_session) {
+ goto skip_ust;
+ }
+ cds_lfht_for_each_entry(session->ust_session->consumer->socks->ht,
+ &iter, socket, node.node) {
+ relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
+ -1ULL :
+ session->ust_session->consumer->net_seq_index;
+
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_trace_chunk_exists(socket,
+ relayd_id,
+ session->id, session->chunk_being_archived,
+ &exists_status);
+ if (ret) {
+ pthread_mutex_unlock(socket->lock);
+ ERR("Error occured while checking rotation status on consumer daemon");
+ goto end;
+ }
+
+ if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
+ pthread_mutex_unlock(socket->lock);
+ chunk_exists_on_peer = true;
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ }
+
+skip_ust:
+ if (!session->kernel_session) {
+ goto skip_kernel;
+ }
+ cds_lfht_for_each_entry(session->kernel_session->consumer->socks->ht,
+ &iter, socket, node.node) {
+ pthread_mutex_lock(socket->lock);
+ relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
+ -1ULL :
+ session->kernel_session->consumer->net_seq_index;
+
+ ret = consumer_trace_chunk_exists(socket,
+ relayd_id,
+ session->id, session->chunk_being_archived,
+ &exists_status);
+ if (ret) {
+ pthread_mutex_unlock(socket->lock);
+ ERR("Error occured while checking rotation status on consumer daemon");
+ goto end;
+ }
+
+ if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
+ pthread_mutex_unlock(socket->lock);
+ chunk_exists_on_peer = true;
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ }
+skip_kernel:
+end:
+ rcu_read_unlock();
+
+ if (!chunk_exists_on_peer) {
+ uint64_t chunk_being_archived_id;
+
+ chunk_status = lttng_trace_chunk_get_id(
+ session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+ DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
+ chunk_being_archived_id,
+ session->name);
+ }
+ *_rotation_completed = !chunk_exists_on_peer;
+ if (ret) {
+ ret = session_reset_rotation_state(session,
+ LTTNG_ROTATION_STATE_ERROR);
+ if (ret) {
+ ERR("Failed to reset rotation state of session \"%s\"",
+ session->name);
+ }
+ }
+}
+
+/*
+ * Check if the last rotation was completed, called with session lock held.
+ * Should only return non-zero in the event of a fatal error. Doing so will
+ * shutdown the thread.
+ */
+static
+int check_session_rotation_pending(struct ltt_session *session,
+ struct notification_thread_handle *notification_thread_handle)
+{
+ int ret;
+ struct lttng_trace_archive_location *location;
+ enum lttng_trace_chunk_status chunk_status;
+ bool rotation_completed = false;
+ const char *archived_chunk_name;
+ uint64_t chunk_being_archived_id;
+
+ if (!session->chunk_being_archived) {
+ ret = 0;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+
+ DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
+ session->name, chunk_being_archived_id);
+
+ /*
+ * The rotation-pending check timer of a session is launched in
+ * one-shot mode. If the rotation is incomplete, the rotation
+ * thread will re-enable the pending-check timer.
+ *
+ * The timer thread can't stop the timer itself since it is involved
+ * in the check for the timer's quiescence.
+ */
+ ret = timer_session_rotation_pending_check_stop(session);
+ if (ret) {
+ goto check_ongoing_rotation;
+ }
+
+ check_session_rotation_pending_on_consumers(session,
+ &rotation_completed);
+ if (!rotation_completed ||
+ session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
+ goto check_ongoing_rotation;
+ }
+
+ /*
+ * Now we can clear the "ONGOING" state in the session. New
+ * rotations can start now.
+ */
+ chunk_status = lttng_trace_chunk_get_name(session->chunk_being_archived,
+ &archived_chunk_name, NULL);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+ free(session->last_archived_chunk_name);
+ session->last_archived_chunk_name = strdup(archived_chunk_name);
+ if (!session->last_archived_chunk_name) {
+ PERROR("Failed to duplicate archived chunk name");
+ }
+ session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
+
+ if (!session->quiet_rotation) {
+ location = session_get_trace_archive_location(session);
+ /* Ownership of location is transferred. */
+ ret = notification_thread_command_session_rotation_completed(
+ notification_thread_handle,
+ session->name,
+ session->uid,
+ session->gid,
+ session->last_archived_chunk_id.value,
+ location);
+ if (ret != LTTNG_OK) {
+ ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+ session->name);
+ }
+ }
+
+ if (!session->active && !session->quiet_rotation) {
+ /*
+ * A stop command was issued during the rotation, it is
+ * up to the rotation completion check to perform the
+ * renaming of the last chunk that was produced.
+ */
+ ret = notification_thread_command_session_rotation_ongoing(
+ notification_thread_handle,
+ session->name,
+ session->uid,
+ session->gid,
+ session->most_recent_chunk_id.value);
+ if (ret != LTTNG_OK) {
+ ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+ session->name);
+ }
+
+ /* Ownership of location is transferred. */
+ location = session_get_trace_archive_location(session);
+ ret = notification_thread_command_session_rotation_completed(
+ notification_thread_handle,
+ session->name,
+ session->uid,
+ session->gid,
+ session->most_recent_chunk_id.value,
+ location);
+ if (ret != LTTNG_OK) {
+ ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+ session->name);
+ }
+ }
+
+ ret = 0;
+check_ongoing_rotation:
+ if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+ uint64_t chunk_being_archived_id;
+
+ chunk_status = lttng_trace_chunk_get_id(
+ session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+
+ DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " is still pending for session %s",
+ chunk_being_archived_id, session->name);
+ ret = timer_session_rotation_pending_check_start(session,
+ DEFAULT_ROTATE_PENDING_TIMER);
+ if (ret) {
+ ERR("Failed to re-enable rotation pending timer");
+ ret = -1;
+ goto end;
+ }
+ }
+
+end:
+ return ret;
+}
+
+/* Call with the session and session_list locks held. */
+static
+int launch_session_rotation(struct ltt_session *session)
+{
+ int ret;
+ struct lttng_rotate_session_return rotation_return;
+
+ DBG("[rotation-thread] Launching scheduled time-based rotation on session \"%s\"",
+ session->name);
+
+ ret = cmd_rotate_session(session, &rotation_return, false);
+ if (ret == LTTNG_OK) {
+ DBG("[rotation-thread] Scheduled time-based rotation successfully launched on session \"%s\"",
+ session->name);