X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.c;h=333246afb3f4d312874cca9cfc144e6cc0fdf932;hp=1d02ee32945200b63142d934565ab83154513953;hb=a44ca2ca85e4b64729f7b88b1919fd6737dfff8a;hpb=9237e6a108fdba7acc014f739d0569565552bdec diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 1d02ee329..333246afb 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -63,29 +63,59 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } + if (!stream_get(stream)) { + ERR("Cannot get stream"); + goto error; + } + vstream->stream = stream; + + pthread_mutex_lock(&stream->lock); + + if (stream->is_metadata && stream->trace->viewer_metadata_stream) { + ERR("Cannot attach viewer metadata stream to trace (busy)."); + goto error_unlock; + } + switch (seek_t) { case LTTNG_VIEWER_SEEK_BEGINNING: - vstream->current_tracefile_id = stream->oldest_tracefile_id; + { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + + if (seq_tail == -1ULL) { + /* + * Tail may not be initialized yet. Nonetheless, we know + * we want to send the first index once it becomes + * available. + */ + seq_tail = 0; + } + vstream->current_tracefile_id = + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; break; + } case LTTNG_VIEWER_SEEK_LAST: - vstream->current_tracefile_id = stream->current_tracefile_id; + vstream->current_tracefile_id = + tracefile_array_get_file_index_head(stream->tfa); + /* + * We seek at the very end of each stream, awaiting for + * a future packet to eventually come in. + * + * We don't need to check the head position for -1ULL since the + * increment will set it to 0. + */ + vstream->index_sent_seqcount = + tracefile_array_get_seq_head(stream->tfa) + 1; break; default: - goto error; - } - if (!stream_get(stream)) { - ERR("Cannot get stream"); - goto error; + goto error_unlock; } - vstream->stream = stream; - pthread_mutex_lock(&stream->lock); /* - * If we never received an index for the current stream, delay the opening - * of the index, otherwise open it right now. + * If we never received an index for the current stream, delay + * the opening of the index, otherwise open it right now. */ - if (vstream->current_tracefile_id == stream->current_tracefile_id - && stream->total_index_received == 0) { + if (stream->index_received_seqcount == 0) { vstream->index_fd = NULL; } else { int read_fd; @@ -112,14 +142,12 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, if (lseek_ret < 0) { goto error_unlock; } - vstream->last_sent_index = stream->total_index_received; } - pthread_mutex_unlock(&stream->lock); - if (stream->is_metadata) { rcu_assign_pointer(stream->trace->viewer_metadata_stream, vstream); } + pthread_mutex_unlock(&stream->lock); /* Globally visible after the add unique. */ lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); @@ -226,26 +254,6 @@ void viewer_stream_put(struct relay_viewer_stream *vstream) rcu_read_unlock(); } -/* - * Returns whether the current tracefile is readable. If not, it has - * been overwritten. - * Must be called with rstream lock held. - */ -bool viewer_stream_is_tracefile_seq_readable(struct relay_viewer_stream *vstream, - uint64_t seq) -{ - struct relay_stream *stream = vstream->stream; - - if (seq >= stream->oldest_tracefile_seq - && seq <= stream->current_tracefile_seq) { - /* seq is a readable file. */ - return true; - } else { - /* seq is not readable. */ - return false; - } -} - /* * Rotate a stream to the next tracefile. * @@ -256,9 +264,11 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) { int ret; struct relay_stream *stream = vstream->stream; + uint64_t new_id; /* Detect the last tracefile to open. */ - if (stream->total_index_received == vstream->last_sent_index + if (stream->index_received_seqcount + == vstream->index_sent_seqcount && stream->trace->session->connection_closed) { ret = 1; goto end; @@ -270,17 +280,29 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) goto end; } - if (!viewer_stream_is_tracefile_seq_readable(vstream, - vstream->current_tracefile_seq + 1)) { - vstream->current_tracefile_id = - stream->oldest_tracefile_id; - vstream->current_tracefile_seq = - stream->oldest_tracefile_seq; + /* + * Try to move to the next file. + */ + new_id = (vstream->current_tracefile_id + 1) + % stream->tracefile_count; + if (tracefile_array_seq_in_file(stream->tfa, new_id, + vstream->index_sent_seqcount)) { + vstream->current_tracefile_id = new_id; } else { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + + /* + * This can only be reached on overwrite, which implies there + * has been data written at some point, which will have set the + * tail. + */ + assert(seq_tail != -1ULL); + /* + * We need to resync because we lag behind tail. + */ vstream->current_tracefile_id = - (vstream->current_tracefile_id + 1) - % stream->tracefile_count; - vstream->current_tracefile_seq++; + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; } if (vstream->index_fd) {