X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=92f9a5600ac28b7280dcea9f1041b7725b13b748;hp=fcc7fc00ae55c844b935ef065028e3526c384257;hb=7a45c7e6401baebe3715b317a3d871ee49921057;hpb=2f21a469eee9bc3352b439b1e64fe6bbd5088fea diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index fcc7fc00a..92f9a5600 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1198,6 +1198,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, uint64_t stream_handle = -1ULL; char *path_name = NULL, *channel_name = NULL; uint64_t tracefile_size = 0, tracefile_count = 0; + struct relay_stream_chunk_id stream_chunk_id = { 0 }; if (!session || !conn->version_check_done) { ERR("Trying to add a stream before version check"); @@ -1216,7 +1217,9 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, } else { /* From 2.11 to ... */ ret = cmd_recv_stream_2_11(payload, &path_name, - &channel_name, &tracefile_size, &tracefile_count); + &channel_name, &tracefile_size, &tracefile_count, + &stream_chunk_id.value); + stream_chunk_id.is_set = true; } if (ret < 0) { @@ -1235,7 +1238,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, /* We pass ownership of path_name and channel_name. */ stream = stream_create(trace, stream_handle, path_name, - channel_name, tracefile_size, tracefile_count); + channel_name, tracefile_size, tracefile_count, + &stream_chunk_id); path_name = NULL; channel_name = NULL; @@ -2310,12 +2314,18 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; + stream->prev_index_seq = index_info.net_seq_num; } else if (ret > 0) { /* no flush. */ ret = 0; } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ ERR("relay_index_try_flush error %d", ret); - relay_index_put(index); ret = -1; } @@ -2481,7 +2491,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr goto end_stream_unlock; } - stream->chunk_id = stream_info.new_chunk_id; + assert(stream->current_chunk_id.is_set); + stream->current_chunk_id.value = stream_info.new_chunk_id; if (stream->is_metadata) { /* @@ -2815,7 +2826,8 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, chunk_id = be64toh(msg.chunk_id); - DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id); + DBG("Evaluating rotate pending for session \"%s\" and chunk id %" PRIu64, + session->session_name, chunk_id); /* * Iterate over all the streams in the session and check if they are @@ -2837,7 +2849,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, rotate_pending = true; DBG("Stream %" PRIu64 " is still rotating", stream->stream_handle); - } else if (stream->chunk_id < chunk_id) { + } else if (stream->current_chunk_id.value < chunk_id) { /* * Stream closed on the consumer but still active on the * relay. @@ -3211,9 +3223,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* No flush. */ ret = 0; } else { - /* Put self-ref for this index due to error. */ - relay_index_put(index); - index = NULL; + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); ret = -1; } end: @@ -3467,6 +3483,7 @@ static enum relay_connection_status relay_process_data_receive_payload( if (index_flushed) { stream->pos_after_last_complete_data_index = stream->tracefile_size_current; + stream->prev_index_seq = state->header.net_seq_num; } stream->prev_seq = state->header.net_seq_num;