X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.c;h=2f5d6f788f13ff8a3c35d9aec91ba0e15624a7cc;hp=3953d7dd10a4bd18132926286536df87a27ff5f0;hb=6c1c0768320135c6936c371b09731851b508c023;hpb=2f8f53af90479595d530f8f02e71dd0b9fb810ee diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 3953d7dd1..2f5d6f788 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include @@ -41,11 +42,12 @@ static void deferred_free_viewer_stream(struct rcu_head *head) } struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t) + enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace) { struct relay_viewer_stream *vstream; assert(stream); + assert(ctf_trace); vstream = zmalloc(sizeof(*vstream)); if (!vstream) { @@ -53,7 +55,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->session_id = stream->session->id; + vstream->session_id = stream->session_id; vstream->stream_handle = stream->stream_handle; vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); vstream->channel_name = strndup(stream->channel_name, @@ -63,10 +65,10 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, vstream->tracefile_count_last = -1ULL; switch (seek_t) { - case VIEWER_SEEK_BEGINNING: + case LTTNG_VIEWER_SEEK_BEGINNING: vstream->tracefile_count_current = stream->oldest_tracefile_id; break; - case VIEWER_SEEK_LAST: + case LTTNG_VIEWER_SEEK_LAST: vstream->tracefile_count_current = stream->tracefile_count_current; break; default: @@ -74,11 +76,9 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->ctf_trace = stream->ctf_trace; if (vstream->metadata_flag) { - vstream->ctf_trace->viewer_metadata_stream = vstream; + ctf_trace->viewer_metadata_stream = vstream; } - uatomic_inc(&vstream->ctf_trace->refcount); /* Globally visible after the add unique. */ lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); @@ -113,7 +113,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, vstream->index_read_fd = read_fd; } - if (seek_t == VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) { + if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) { off_t lseek_ret; lseek_ret = lseek(vstream->index_read_fd, @@ -144,14 +144,16 @@ void viewer_stream_delete(struct relay_viewer_stream *stream) assert(!ret); } -void viewer_stream_destroy(struct relay_viewer_stream *stream) +void viewer_stream_destroy(struct ctf_trace *ctf_trace, + struct relay_viewer_stream *stream) { int ret; - unsigned long ret_ref; assert(stream); - ret_ref = uatomic_add_return(&stream->ctf_trace->refcount, -1); - assert(ret_ref >= 0); + + if (ctf_trace) { + ctf_trace_put_ref(ctf_trace); + } if (stream->read_fd >= 0) { ret = close(stream->read_fd); @@ -166,25 +168,6 @@ void viewer_stream_destroy(struct relay_viewer_stream *stream) } } - /* - * If the only stream left in the HT is the metadata stream, - * we need to remove it because we won't detect a EOF for this - * stream. - */ - if (ret_ref == 1 && stream->ctf_trace->viewer_metadata_stream) { - viewer_stream_delete(stream->ctf_trace->viewer_metadata_stream); - viewer_stream_destroy(stream->ctf_trace->viewer_metadata_stream); - stream->ctf_trace->metadata_stream = NULL; - DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id); - /* - * The streaming-side is already closed and we can't receive a new - * stream concurrently at this point (since the session is being - * destroyed), so when we detect the refcount equals 0, we are the - * only owners of the ctf_trace and we can free it ourself. - */ - free(stream->ctf_trace); - } - call_rcu(&stream->rcu_node, deferred_free_viewer_stream); } @@ -214,6 +197,7 @@ end: /* * Rotate a stream to the next tracefile. * + * Must be called with viewer_stream_rotation_lock held. * Returns 0 on success, 1 on EOF, a negative value on error. */ int viewer_stream_rotate(struct relay_viewer_stream *vstream, @@ -223,6 +207,13 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, uint64_t tracefile_id; assert(vstream); + assert(stream); + + if (vstream->tracefile_count == 0) { + /* Ignore rotation, there is none to do. */ + ret = 0; + goto end; + } tracefile_id = (vstream->tracefile_count_current + 1) % vstream->tracefile_count; @@ -235,20 +226,11 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, goto end; } - /* - * If the stream on the streaming side still exists, lock to execute - * rotation in order to avoid races between a modification on the index - * values. - */ - if (stream) { - pthread_mutex_lock(&stream->viewer_stream_rotation_lock); - } - /* * The writer and the reader are not working in the same tracefile, we can * read up to EOF, we don't care about the total_index_received. */ - if (!stream || (stream->tracefile_count_current != tracefile_id)) { + if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) { vstream->close_write_flag = 1; } else { /* @@ -256,7 +238,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, * limit our reading to the number of indexes received. */ vstream->close_write_flag = 0; - if (stream) { + if (stream->close_flag) { vstream->total_index_received = stream->total_index_received; } } @@ -278,10 +260,6 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, vstream->abort_flag = 0; pthread_mutex_unlock(&vstream->overwrite_lock); - if (stream) { - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); - } - ret = index_open(vstream->path_name, vstream->channel_name, vstream->tracefile_count, vstream->tracefile_count_current); if (ret < 0) {