X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=70a1948c328cc690e39e4add4d67f4a2a8f2fe44;hp=0005714c55abf9bdd9a3347bcb96f99304b2ccca;hb=cef0f7d51b8025d3ba04e6496242c1cca1641aa6;hpb=6cd525e813795a1d5e38feac8dedf2c73ffb1274 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 0005714c5..70a1948c3 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -849,7 +849,10 @@ static void destroy_stream(struct relay_stream *stream) * lookup failure on the live thread side of a stream indicates * that the viewer stream index received value should be used. */ + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); vstream->total_index_received = stream->total_index_received; + vstream->close_write_flag = 1; + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); } /* Cleanup index of that stream. */ @@ -2039,10 +2042,54 @@ int relay_process_data(struct relay_command *cmd) if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { + struct relay_viewer_stream *vstream; + uint64_t new_id; + + new_id = (stream->tracefile_count_current + 1) % + stream->tracefile_count; + /* + * When we wrap-around back to 0, we start overwriting old + * trace data. + */ + if (!stream->tracefile_overwrite && new_id == 0) { + stream->tracefile_overwrite = 1; + } + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); + if (stream->tracefile_overwrite) { + stream->oldest_tracefile_id = + (stream->oldest_tracefile_id + 1) % + stream->tracefile_count; + } + vstream = live_find_viewer_stream_by_id(stream->stream_handle); + if (vstream) { + /* + * The viewer is reading a file about to be + * overwritten. Close the FDs it is + * currently using and let it handle the fault. + */ + if (vstream->tracefile_count_current == new_id) { + pthread_mutex_lock(&vstream->overwrite_lock); + vstream->abort_flag = 1; + pthread_mutex_unlock(&vstream->overwrite_lock); + DBG("Streaming side setting abort_flag on stream %s_%lu\n", + stream->channel_name, new_id); + } else if (vstream->tracefile_count_current == + stream->tracefile_count_current) { + /* + * The reader and writer were in the + * same trace file, inform the viewer + * that no new index will ever be added + * to this file. + */ + vstream->close_write_flag = 1; + } + } ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, stream->tracefile_count, relayd_uid, relayd_gid, stream->fd, &(stream->tracefile_count_current), &stream->fd); + stream->total_index_received = 0; + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (ret < 0) { ERR("Rotating stream output file"); goto end_rcu_unlock;