Fix: rotation may never complete in per-PID buffering mode
[lttng-tools.git] / src / common / consumer / consumer.c
index 1d6f8b4cc4ce7cba975c85eb6a2c01fcb6e19e5b..6de72e2758b5086f35c36e96f69276eaa8dfbdf0 100644 (file)
@@ -2283,26 +2283,6 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_unlock();
 }
 
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
-               uint64_t key)
-{
-       ssize_t ret;
-
-       do {
-               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret == -1) {
-               PERROR("Failed to write to the channel rotation pipe");
-       } else {
-               DBG("Sent channel rotation notification for channel key %"
-                               PRIu64, key);
-               ret = 0;
-       }
-
-       return (int) ret;
-}
-
 /*
  * Perform operations that need to be done after a stream has
  * rotated and released the stream lock.
@@ -2339,13 +2319,7 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream,
                        abort();
        }
 
-       if (--stream->chan->nr_stream_rotate_pending == 0) {
-               DBG("Rotation of channel \"%s\" completed, notifying the session daemon",
-                               stream->chan->name);
-               ret = rotate_notify_sessiond(ctx, stream->chan->key);
-       }
        pthread_mutex_unlock(&stream->chan->lock);
-
        return ret;
 }
 
@@ -3739,21 +3713,6 @@ int consumer_data_pending(uint64_t id)
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
-       relayd = find_relayd_by_session_id(id);
-       if (relayd) {
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_begin_data_pending(&relayd->control_sock,
-                               relayd->relayd_session_id);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       /* Communication error thus the relayd so no data pending. */
-                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
-                       lttng_consumer_cleanup_relayd(relayd);
-                       goto data_not_pending;
-               }
-       }
-
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&id, lttng_ht_seed),
                        ht->match_fct, &id,
@@ -3776,9 +3735,27 @@ int consumer_data_pending(uint64_t id)
                        }
                }
 
-               /* Relayd check */
-               if (relayd) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       relayd = find_relayd_by_session_id(id);
+       if (relayd) {
+               unsigned int is_data_inflight = 0;
+
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_begin_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id);
+               if (ret < 0) {
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       /* Communication error thus the relayd so no data pending. */
+                       goto data_not_pending;
+               }
+
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&id, lttng_ht_seed),
+                               ht->match_fct, &id,
+                               &iter.iter, stream, node_session_id.node) {
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock,
                                                stream->relayd_stream_id);
@@ -3787,27 +3764,19 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
-                       if (ret < 0) {
+
+                       if (ret == 1) {
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                               goto data_pending;
+                       } else if (ret < 0) {
                                ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
                                lttng_consumer_cleanup_relayd(relayd);
                                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                               pthread_mutex_unlock(&stream->lock);
                                goto data_not_pending;
                        }
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                       if (ret == 1) {
-                               pthread_mutex_unlock(&stream->lock);
-                               goto data_pending;
-                       }
                }
-               pthread_mutex_unlock(&stream->lock);
-       }
-
-       if (relayd) {
-               unsigned int is_data_inflight = 0;
 
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               /* Send end command for data pending. */
                ret = relayd_end_data_pending(&relayd->control_sock,
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
@@ -4214,8 +4183,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
        } else {
                ret = rotate_local_stream(ctx, stream);
        }
+       stream->trace_archive_id++;
        if (ret < 0) {
-               ERR("Rotate stream");
+               ERR("Failed to rotate stream, ret = %i", ret);
                goto error;
        }
 
@@ -4380,7 +4350,64 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path,
        }
 }
 
-int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+/* Stream lock must be acquired by the caller. */
+static
+bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream,
+               uint64_t session_id, uint64_t chunk_id)
+{
+       bool pending = false;
+
+       if (stream->session_id != session_id) {
+               /* Skip. */
+               goto end;
+       }
+
+       /*
+        * If the stream's archive_id belongs to the chunk being rotated (or an
+        * even older one), it means that the consumer has not consumed all the
+        * buffers that belong to the chunk being rotated. Therefore, the
+        * rotation is considered as ongoing/pending.
+        */
+       pending = stream->trace_archive_id <= chunk_id;
+end:
+       return pending;
+}
+
+/* RCU read lock must be acquired by the caller. */
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+               uint64_t chunk_id)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       bool rotation_pending = false;
+
+       /* Start with the metadata streams... */
+       cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+               pthread_mutex_lock(&stream->lock);
+               rotation_pending = check_stream_rotation_pending(stream,
+                               session_id, chunk_id);
+               pthread_mutex_unlock(&stream->lock);
+               if (rotation_pending) {
+                       goto end;
+               }
+       }
+
+       /* ... followed by the data streams. */
+       cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+               pthread_mutex_lock(&stream->lock);
+               rotation_pending = check_stream_rotation_pending(stream,
+                               session_id, chunk_id);
+               pthread_mutex_unlock(&stream->lock);
+               if (rotation_pending) {
+                       goto end;
+               }
+       }
+
+end:
+       return !!rotation_pending;
+}
+
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
                uint64_t relayd_id, uint64_t chunk_id)
 {
        int ret;
@@ -4388,7 +4415,7 @@ int lttng_consumer_rotate_pending_relay(uint64_t session_id,
 
        relayd = consumer_find_relayd(relayd_id);
        if (!relayd) {
-               ERR("Failed to find relayd");
+               ERR("Failed to find relayd id %" PRIu64, relayd_id);
                ret = -1;
                goto end;
        }
This page took 0.02851 seconds and 5 git commands to generate.