Fix: relayd stream.c: LTTNG_OPTIONAL_GET address confusion
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 4d3d37a2bc91d745b8e6ee83935aa623f27b3db9..755fb6734072113af804e59f6e1e81089adb6b1f 100644 (file)
@@ -90,7 +90,6 @@ static int stream_create_data_output_file_from_trace_chunk(
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
 
        ASSERT_LOCKED(stream->lock);
-       assert(stream->trace_chunk);
 
        ret = utils_stream_file_path(stream->path_name, stream->channel_name,
                        stream->tracefile_size, stream->tracefile_current_index,
@@ -210,7 +209,7 @@ static int rotate_truncate_stream(struct relay_stream *stream)
        struct stream_fd *previous_stream_fd = NULL;
        struct lttng_trace_chunk *previous_chunk = NULL;
 
-       if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
+       if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
                ERR("Protocol error encoutered in %s(): stream rotation "
                        "sequence number is before the current sequence number "
                        "and the next trace chunk is unset. Honoring this "
@@ -923,6 +922,27 @@ void try_stream_close(struct relay_stream *stream)
        stream->closed = true;
        /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
+
+       /*
+        * If we are closed by an application exiting (per-pid buffers),
+        * we need to put our reference on the stream trace chunk right
+        * away, because otherwise still holding the reference on the
+        * trace chunk could allow a viewer stream (which holds a reference
+        * to the stream) to postpone destroy waiting for the chunk to cease
+        * to exist endlessly until the viewer is detached.
+        */
+
+       /* Put stream fd before put chunk. */
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
        pthread_mutex_unlock(&stream->lock);
        DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
@@ -934,6 +954,14 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
        int ret = 0;
 
        ASSERT_LOCKED(stream->lock);
+
+       if (!stream->stream_fd || !stream->trace_chunk) {
+               ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+                               stream->stream_handle, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
+
        if (caa_likely(stream->tracefile_size == 0)) {
                /* No size limit set; nothing to check. */
                goto end;
@@ -999,6 +1027,12 @@ int stream_write(struct relay_stream *stream,
        memset(padding_buffer, 0,
                        min(sizeof(padding_buffer), padding_to_write));
 
+       if (!stream->stream_fd || !stream->trace_chunk) {
+               ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+                               stream->stream_handle, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
        if (packet) {
                write_ret = lttng_write(stream->stream_fd->fd,
                                packet->data, packet->size);
This page took 0.02499 seconds and 5 git commands to generate.