X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=4238d3b53ff217e05613e5ed7aab0aef72f11921;hp=501e6e90a109615d09a562de1059aca894d64523;hb=f607fc46edbcace68ab34913b8d22c024c249146;hpb=3ff5c5db220d92baf64280ba54713fcafe76142e diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 501e6e90a..4238d3b53 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -72,6 +72,11 @@ end: static void stream_complete_rotation(struct relay_stream *stream) { DBG("Rotation completed for stream %" PRIu64, stream->stream_handle); + if (stream->ongoing_rotation.value.next_trace_chunk) { + tracefile_array_reset(stream->tfa); + tracefile_array_commit_seq(stream->tfa, + stream->index_received_seqcount); + } lttng_trace_chunk_put(stream->trace_chunk); stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk; stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {}; @@ -146,8 +151,8 @@ static int stream_rotate_data_file(struct relay_stream *stream) { int ret = 0; - DBG("Rotating stream %" PRIu64 " data file", - stream->stream_handle); + DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64, + stream->stream_handle, stream->tracefile_size_current); if (stream->stream_fd) { stream_fd_put(stream->stream_fd); @@ -179,6 +184,8 @@ static int stream_rotate_data_file(struct relay_stream *stream) goto end; } } + DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64, + __func__, stream->stream_handle, stream->tracefile_size_current); stream->tracefile_size_current = 0; stream->pos_after_last_complete_data_index = 0; stream->ongoing_rotation.value.data_rotated = true; @@ -367,6 +374,15 @@ static int try_rotate_stream_data(struct relay_stream *stream) goto end; } + DBG("%s: Stream %" PRIu64 + " (rotate_at_index_packet_seq_num = %" PRIu64 + ", rotate_at_prev_data_net_seq = %" PRIu64 + ", prev_data_seq = %" PRIu64 ")", + __func__, stream->stream_handle, + stream->ongoing_rotation.value.packet_seq_num, + stream->ongoing_rotation.value.prev_data_net_seq, + stream->prev_data_seq); + if (stream->prev_data_seq == -1ULL || stream->ongoing_rotation.value.prev_data_net_seq == -1ULL || stream->prev_data_seq < @@ -483,6 +499,15 @@ static int try_rotate_stream_index(struct relay_stream *stream) goto end; } + DBG("%s: Stream %" PRIu64 + " (rotate_at_packet_seq_num = %" PRIu64 + ", received_packet_seq_num = " + "(value = %" PRIu64 ", is_set = %" PRIu8 "))", + __func__, stream->stream_handle, + stream->ongoing_rotation.value.packet_seq_num, + stream->received_packet_seq_num.value, + stream->received_packet_seq_num.is_set); + if (!stream->received_packet_seq_num.is_set || LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 < stream->ongoing_rotation.value.packet_seq_num) { @@ -505,8 +530,10 @@ static int try_rotate_stream_index(struct relay_stream *stream) stream->ongoing_rotation.value.packet_seq_num); DBG("Rotating stream %" PRIu64 " index file", stream->stream_handle); - ret = create_index_file(stream, - stream->ongoing_rotation.value.next_trace_chunk); + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; + } stream->ongoing_rotation.value.index_rotated = true; /* @@ -848,6 +875,12 @@ int stream_set_pending_rotation(struct relay_stream *stream, * A metadata stream has no index; consider it already rotated. */ stream->ongoing_rotation.value.index_rotated = true; + if (next_trace_chunk) { + /* + * The metadata will be received again in the new chunk. + */ + stream->metadata_received = 0; + } ret = stream_rotate_data_file(stream); } else { ret = try_rotate_stream_index(stream); @@ -1030,6 +1063,8 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size, * Reset current size because we just performed a stream * rotation. */ + DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64, + __func__, stream->stream_handle, stream->tracefile_size_current); stream->tracefile_size_current = 0; *file_rotated = true; } else { @@ -1217,6 +1252,8 @@ int stream_add_index(struct relay_stream *stream, ASSERT_LOCKED(stream->lock); + DBG("stream_add_index for stream %" PRIu64, stream->stream_handle); + /* Live beacon handling */ if (index_info->packet_size == 0) { DBG("Received live beacon for stream %" PRIu64, @@ -1319,6 +1356,8 @@ int stream_reset_file(struct relay_stream *stream) stream->stream_fd = NULL; } + DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64, + __func__, stream->stream_handle, stream->tracefile_size_current); stream->tracefile_size_current = 0; stream->prev_data_seq = 0; stream->prev_index_seq = 0;