From 298a25cae73bda8d8ad6fa986f7d0b822645559b Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 12 Oct 2018 18:22:35 -0400 Subject: [PATCH] Fix: take index sequence number into account for data pending check MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The data pending checks are only performed on the sequence number of the received data. However, it is expected that the index of the stream (when applicable) has been written to disk by the time this check returns that no data is pending. This patch ensures that the minimum between the data and index sequence numbers are used to perform this check. Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/main.c | 40 +++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 92f9a5600..c84ebaac1 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1083,6 +1083,11 @@ static int set_index_control_data(struct relay_index *index, return relay_index_set_data(index, &index_data); } +static bool session_streams_have_index(const struct relay_session *session) +{ + return session->minor >= 4 && !session->snapshot; +} + /* * Handle the RELAYD_CREATE_SESSION command. * @@ -1931,6 +1936,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_stream *stream; ssize_t send_ret; int ret; + uint64_t stream_seq; DBG("Data pending command received"); @@ -1958,12 +1964,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&stream->lock); - DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64 - " and last_seq %" PRIu64, msg.stream_id, - stream->prev_seq, msg.last_net_seq_num); + if (session_streams_have_index(session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_seq; + } + DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64 + ", prev_index_seq %" PRIu64 + ", and last_seq %" PRIu64, msg.stream_id, + stream->prev_seq, stream->prev_index_seq, + msg.last_net_seq_num); /* Avoid wrapping issue */ - if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) { + if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) { /* Data has in fact been written and is NOT pending */ ret = 0; } else { @@ -2183,7 +2200,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, } pthread_mutex_lock(&stream->lock); if (!stream->data_pending_check_done) { - if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) { + uint64_t stream_seq; + + if (session_streams_have_index(conn->session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_seq; + } + if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { is_data_inflight = 1; DBG("Data is still in flight for stream %" PRIu64, stream->stream_handle); @@ -3462,7 +3490,7 @@ static enum relay_connection_status relay_process_data_receive_payload( } - if (session->minor >= 4 && !session->snapshot) { + if (session_streams_have_index(session)) { ret = handle_index_data(stream, state->header.net_seq_num, state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size); if (ret < 0) { -- 2.34.1