common: index and trace-chunk file creation/open API change
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index c0aeb17190d187e12155e8a162ef89204647df2b..501e6e90a109615d09a562de1059aca894d64523 100644 (file)
@@ -123,7 +123,7 @@ static int stream_create_data_output_file_from_trace_chunk(
        }
 
        status = lttng_trace_chunk_open_file(
-                       trace_chunk, stream_path, flags, mode, &fd);
+                       trace_chunk, stream_path, flags, mode, &fd, false);
        if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to open stream file \"%s\"", stream->channel_name);
                ret = -1;
@@ -445,13 +445,14 @@ static int create_index_file(struct relay_stream *stream,
                ret = -1;
                goto end;
        }
-       stream->index_file = lttng_index_file_create_from_trace_chunk(
+       status = lttng_index_file_create_from_trace_chunk(
                        chunk, stream->path_name,
                        stream->channel_name, stream->tracefile_size,
                        stream->tracefile_current_index,
                        lttng_to_index_major(major, minor),
-                       lttng_to_index_minor(major, minor), true);
-       if (!stream->index_file) {
+                       lttng_to_index_minor(major, minor), true,
+                       &stream->index_file);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ret = -1;
                goto end;
        }
@@ -1086,8 +1087,14 @@ int stream_write(struct relay_stream *stream,
        }
 
        if (stream->is_metadata) {
-               stream->metadata_received += packet ? packet->size : 0;
-               stream->metadata_received += padding_len;
+               size_t recv_len;
+
+               recv_len = packet ? packet->size : 0;
+               recv_len += padding_len;
+               stream->metadata_received += recv_len;
+               if (recv_len) {
+                       stream->no_new_metadata_notified = false;
+               }
        }
 
        DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
@@ -1154,7 +1161,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
        ret = relay_index_try_flush(index);
        if (ret == 0) {
                tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
                LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
                        be64toh(index->index_data.packet_seq_num));
@@ -1250,7 +1257,7 @@ int stream_add_index(struct relay_stream *stream,
        ret = relay_index_try_flush(index);
        if (ret == 0) {
                tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
                stream->prev_index_seq = index_info->net_seq_num;
This page took 0.02588 seconds and 5 git commands to generate.