X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=41d44a58598ff9033dfea8cdd69d44f0a63de75b;hp=c59bb9416b1c9609ca34884cbc6126daf1295b78;hb=cb523e0290a439cf57fa7823ffa78803500ba4c3;hpb=f8f3885cc52af9d3c951da78989d6f4a25270411 diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index c59bb9416..41d44a585 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -32,16 +32,7 @@ /* Should be called with RCU read-side lock held. */ bool stream_get(struct relay_stream *stream) { - bool has_ref = false; - - pthread_mutex_lock(&stream->reflock); - if (stream->ref.refcount != 0) { - has_ref = true; - urcu_ref_get(&stream->ref); - } - pthread_mutex_unlock(&stream->reflock); - - return has_ref; + return urcu_ref_get_unless_zero(&stream->ref); } /* @@ -77,7 +68,8 @@ end: struct relay_stream *stream_create(struct ctf_trace *trace, uint64_t stream_handle, char *path_name, char *channel_name, uint64_t tracefile_size, - uint64_t tracefile_count) + uint64_t tracefile_count, + const struct relay_stream_chunk_id *chunk_id) { int ret; struct relay_stream *stream = NULL; @@ -86,24 +78,26 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream = zmalloc(sizeof(struct relay_stream)); if (stream == NULL) { PERROR("relay stream zmalloc"); - ret = -1; goto error_no_alloc; } stream->stream_handle = stream_handle; - stream->prev_seq = -1ULL; + stream->prev_data_seq = -1ULL; + stream->prev_index_seq = -1ULL; stream->last_net_seq_num = -1ULL; stream->ctf_stream_id = -1ULL; stream->tracefile_size = tracefile_size; stream->tracefile_count = tracefile_count; stream->path_name = path_name; + stream->prev_path_name = NULL; stream->channel_name = channel_name; + stream->rotate_at_seq_num = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); - pthread_mutex_init(&stream->reflock, NULL); urcu_ref_init(&stream->ref); ctf_trace_get(trace); stream->trace = trace; + stream->current_chunk_id = *chunk_id; stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!stream->indexes_ht) { @@ -227,8 +221,7 @@ unlock: /* * Stream must be protected by holding the stream lock or by virtue of being - * called from stream_destroy, in which case it is guaranteed to be accessed - * from a single thread by the reflock. + * called from stream_destroy. */ static void stream_unpublish(struct relay_stream *stream) { @@ -263,6 +256,7 @@ static void stream_destroy(struct relay_stream *stream) tracefile_array_destroy(stream->tfa); } free(stream->path_name); + free(stream->prev_path_name); free(stream->channel_name); free(stream); } @@ -278,9 +272,6 @@ static void stream_destroy_rcu(struct rcu_head *rcu_head) /* * No need to take stream->lock since this is only called on the final * stream_put which ensures that a single thread may act on the stream. - * - * At that point, the object is also protected by the reflock which - * guarantees that no other thread may share ownership of this stream. */ static void stream_release(struct urcu_ref *ref) { @@ -321,15 +312,7 @@ static void stream_release(struct urcu_ref *ref) void stream_put(struct relay_stream *stream) { DBG("stream put for stream id %" PRIu64, stream->stream_handle); - /* - * Ensure existence of stream->reflock for stream unlock. - */ rcu_read_lock(); - /* - * Stream reflock ensures that concurrent test and update of - * stream ref is atomic. - */ - pthread_mutex_lock(&stream->reflock); assert(stream->ref.refcount != 0); /* * Wait until we have processed all the stream packets before @@ -339,13 +322,20 @@ void stream_put(struct relay_stream *stream) stream->stream_handle, (int) stream->ref.refcount); urcu_ref_put(&stream->ref, stream_release); - pthread_mutex_unlock(&stream->reflock); rcu_read_unlock(); } void try_stream_close(struct relay_stream *stream) { + bool session_aborted; + struct relay_session *session = stream->trace->session; + DBG("Trying to close stream %" PRIu64, stream->stream_handle); + + pthread_mutex_lock(&session->lock); + session_aborted = session->aborted; + pthread_mutex_unlock(&session->lock); + pthread_mutex_lock(&stream->lock); /* * Can be called concurently by connection close and reception of last @@ -376,6 +366,7 @@ void try_stream_close(struct relay_stream *stream) * a packet. Since those are sent in that order, we take * care of consumerd crashes. */ + DBG("relay_index_close_partial_fd"); relay_index_close_partial_fd(stream); /* * Use the highest net_seq_num we currently have pending @@ -383,11 +374,13 @@ void try_stream_close(struct relay_stream *stream) * at -1ULL if we cannot find any index. */ stream->last_net_seq_num = relay_index_find_last(stream); + DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num); /* Fall-through into the next check. */ } if (stream->last_net_seq_num != -1ULL && - ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { + ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0 + && !session_aborted) { /* * Don't close since we still have data pending. This * handles cases where an explicit close command has