Fix: ust-consumer: segfault on snapshot after regenerate metadata
[lttng-tools.git] / src / common / consumer / consumer-stream.c
index fb878d72cabb6a56e4ff5a8a0d0303062a99d165..621a199884aa848d7d860cf89b881ae11193bbc0 100644 (file)
 #include <unistd.h>
 
 #include <common/common.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/metadata-bucket.h>
 #include <common/index/index.h>
 #include <common/kernel-consumer/kernel-consumer.h>
+#include <common/macros.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 #include <common/utils.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/metadata-bucket.h>
 
 #include "consumer-stream.h"
 
@@ -51,6 +52,12 @@ static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream
        pthread_mutex_unlock(&stream->chan->lock);
 }
 
+static void consumer_stream_data_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+       ASSERT_LOCKED(stream->chan->lock);
+}
+
 static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
 {
        consumer_stream_data_lock_all(stream);
@@ -63,6 +70,12 @@ static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *st
        consumer_stream_data_unlock_all(stream);
 }
 
+static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->metadata_rdv_lock);
+       consumer_stream_data_assert_locked_all(stream);
+}
+
 /* Only used for data streams. */
 static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuf)
@@ -151,9 +164,40 @@ static ssize_t consumer_stream_consume_mmap(
        const unsigned long padding_size =
                        subbuffer->info.data.padded_subbuf_size -
                        subbuffer->info.data.subbuf_size;
-
-       return lttng_consumer_on_read_subbuffer_mmap(
+       const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap(
                        stream, &subbuffer->buffer.buffer, padding_size);
+
+       if (stream->net_seq_idx == -1ULL) {
+               /*
+                * When writing on disk, check that only the subbuffer (no
+                * padding) was written to disk.
+                */
+               if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+                       DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)",
+                                       written_bytes,
+                                       subbuffer->info.data.padded_subbuf_size);
+               }
+       } else {
+               /*
+                * When streaming over the network, check that the entire
+                * subbuffer including padding was successfully written.
+                */
+               if (written_bytes != subbuffer->info.data.subbuf_size) {
+                       DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)",
+                                       written_bytes,
+                                       subbuffer->info.data.subbuf_size);
+               }
+       }
+
+       /*
+        * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass
+        * it along to the caller, else return zero.
+        */
+       if (written_bytes < 0) {
+               ERR("Error reading mmap subbuffer: %zd", written_bytes);
+       }
+
+       return written_bytes;
 }
 
 static ssize_t consumer_stream_consume_splice(
@@ -161,8 +205,24 @@ static ssize_t consumer_stream_consume_splice(
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer)
 {
-       return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
-                       subbuffer->info.data.padded_subbuf_size, 0);
+       const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice(
+                       ctx, stream, subbuffer->info.data.padded_subbuf_size, 0);
+
+       if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+               DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)",
+                               written_bytes,
+                               subbuffer->info.data.padded_subbuf_size);
+       }
+
+       /*
+        * If `lttng_consumer_on_read_subbuffer_splice()` returned an error,
+        * pass it along to the caller, else return zero.
+        */
+       if (written_bytes < 0) {
+               ERR("Error reading splice subbuffer: %zd", written_bytes);
+       }
+
+       return written_bytes;
 }
 
 static int consumer_stream_send_index(
@@ -432,13 +492,14 @@ struct lttng_consumer_stream *consumer_stream_create(
                goto end;
        }
 
+       rcu_read_lock();
+
        if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
                ERR("Failed to acquire trace chunk reference during the creation of a stream");
                ret = -1;
                goto error;
        }
 
-       rcu_read_lock();
        stream->chan = channel;
        stream->key = stream_key;
        stream->trace_chunk = trace_chunk;
@@ -452,6 +513,8 @@ struct lttng_consumer_stream *consumer_stream_create(
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
        stream->rotate_position = -1ULL;
+       /* Buffer is created with an open packet. */
+       stream->opened_packet_in_current_trace_chunk = true;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
@@ -509,12 +572,16 @@ struct lttng_consumer_stream *consumer_stream_create(
                                consumer_stream_metadata_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_metadata_unlock_all;
+               stream->read_subbuffer_ops.assert_locked =
+                               consumer_stream_metadata_assert_locked_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                metadata_stream_check_version;
        } else {
                stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_data_unlock_all;
+               stream->read_subbuffer_ops.assert_locked =
+                               consumer_stream_data_assert_locked_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                consumer_stream_update_stats;
                if (channel->is_live) {
This page took 0.026021 seconds and 5 git commands to generate.