X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.c;h=a16f331b8f795607389a1efa2c20263ac7609b6f;hp=3953d7dd10a4bd18132926286536df87a27ff5f0;hb=2a174661a1e0ab551b41ff1cae7191688525fc1f;hpb=2f8f53af90479595d530f8f02e71dd0b9fb810ee diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 3953d7dd1..a16f331b8 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -41,11 +41,12 @@ static void deferred_free_viewer_stream(struct rcu_head *head) } struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t) + enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace) { struct relay_viewer_stream *vstream; assert(stream); + assert(ctf_trace); vstream = zmalloc(sizeof(*vstream)); if (!vstream) { @@ -53,7 +54,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->session_id = stream->session->id; + vstream->session_id = stream->session_id; vstream->stream_handle = stream->stream_handle; vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); vstream->channel_name = strndup(stream->channel_name, @@ -74,11 +75,9 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->ctf_trace = stream->ctf_trace; if (vstream->metadata_flag) { - vstream->ctf_trace->viewer_metadata_stream = vstream; + ctf_trace->viewer_metadata_stream = vstream; } - uatomic_inc(&vstream->ctf_trace->refcount); /* Globally visible after the add unique. */ lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); @@ -144,14 +143,16 @@ void viewer_stream_delete(struct relay_viewer_stream *stream) assert(!ret); } -void viewer_stream_destroy(struct relay_viewer_stream *stream) +void viewer_stream_destroy(struct ctf_trace *ctf_trace, + struct relay_viewer_stream *stream) { int ret; - unsigned long ret_ref; assert(stream); - ret_ref = uatomic_add_return(&stream->ctf_trace->refcount, -1); - assert(ret_ref >= 0); + + if (ctf_trace) { + ctf_trace_put_ref(ctf_trace); + } if (stream->read_fd >= 0) { ret = close(stream->read_fd); @@ -166,25 +167,6 @@ void viewer_stream_destroy(struct relay_viewer_stream *stream) } } - /* - * If the only stream left in the HT is the metadata stream, - * we need to remove it because we won't detect a EOF for this - * stream. - */ - if (ret_ref == 1 && stream->ctf_trace->viewer_metadata_stream) { - viewer_stream_delete(stream->ctf_trace->viewer_metadata_stream); - viewer_stream_destroy(stream->ctf_trace->viewer_metadata_stream); - stream->ctf_trace->metadata_stream = NULL; - DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id); - /* - * The streaming-side is already closed and we can't receive a new - * stream concurrently at this point (since the session is being - * destroyed), so when we detect the refcount equals 0, we are the - * only owners of the ctf_trace and we can free it ourself. - */ - free(stream->ctf_trace); - } - call_rcu(&stream->rcu_node, deferred_free_viewer_stream); } @@ -223,6 +205,13 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, uint64_t tracefile_id; assert(vstream); + assert(stream); + + if (vstream->tracefile_count == 0) { + /* Ignore rotation, there is none to do. */ + ret = 0; + goto end; + } tracefile_id = (vstream->tracefile_count_current + 1) % vstream->tracefile_count; @@ -236,19 +225,16 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, } /* - * If the stream on the streaming side still exists, lock to execute - * rotation in order to avoid races between a modification on the index - * values. + * Lock to execute rotation in order to avoid races between a modification + * on the index values. */ - if (stream) { - pthread_mutex_lock(&stream->viewer_stream_rotation_lock); - } + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); /* * The writer and the reader are not working in the same tracefile, we can * read up to EOF, we don't care about the total_index_received. */ - if (!stream || (stream->tracefile_count_current != tracefile_id)) { + if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) { vstream->close_write_flag = 1; } else { /* @@ -256,7 +242,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, * limit our reading to the number of indexes received. */ vstream->close_write_flag = 0; - if (stream) { + if (stream->close_flag) { vstream->total_index_received = stream->total_index_received; } } @@ -278,9 +264,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, vstream->abort_flag = 0; pthread_mutex_unlock(&vstream->overwrite_lock); - if (stream) { - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); - } + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); ret = index_open(vstream->path_name, vstream->channel_name, vstream->tracefile_count, vstream->tracefile_count_current);