X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=41d44a58598ff9033dfea8cdd69d44f0a63de75b;hp=af4ef1bbb80d9e17fe331298c4fac558d0795d3c;hb=cb523e0290a439cf57fa7823ffa78803500ba4c3;hpb=7591bab11eceedc6a0d1e02fd6f85592267a63b5 diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index af4ef1bbb..41d44a585 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -17,7 +17,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -33,16 +32,7 @@ /* Should be called with RCU read-side lock held. */ bool stream_get(struct relay_stream *stream) { - bool has_ref = false; - - pthread_mutex_lock(&stream->reflock); - if (stream->ref.refcount != 0) { - has_ref = true; - urcu_ref_get(&stream->ref); - } - pthread_mutex_unlock(&stream->reflock); - - return has_ref; + return urcu_ref_get_unless_zero(&stream->ref); } /* @@ -78,7 +68,8 @@ end: struct relay_stream *stream_create(struct ctf_trace *trace, uint64_t stream_handle, char *path_name, char *channel_name, uint64_t tracefile_size, - uint64_t tracefile_count) + uint64_t tracefile_count, + const struct relay_stream_chunk_id *chunk_id) { int ret; struct relay_stream *stream = NULL; @@ -87,23 +78,26 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream = zmalloc(sizeof(struct relay_stream)); if (stream == NULL) { PERROR("relay stream zmalloc"); - ret = -1; goto error_no_alloc; } stream->stream_handle = stream_handle; - stream->prev_seq = -1ULL; + stream->prev_data_seq = -1ULL; + stream->prev_index_seq = -1ULL; + stream->last_net_seq_num = -1ULL; stream->ctf_stream_id = -1ULL; stream->tracefile_size = tracefile_size; stream->tracefile_count = tracefile_count; stream->path_name = path_name; + stream->prev_path_name = NULL; stream->channel_name = channel_name; + stream->rotate_at_seq_num = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); - pthread_mutex_init(&stream->reflock, NULL); urcu_ref_init(&stream->ref); ctf_trace_get(trace); stream->trace = trace; + stream->current_chunk_id = *chunk_id; stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!stream->indexes_ht) { @@ -137,13 +131,18 @@ struct relay_stream *stream_create(struct ctf_trace *trace, ret = -1; goto end; } + stream->tfa = tracefile_array_create(stream->tracefile_count); + if (!stream->tfa) { + ret = -1; + goto end; + } if (stream->tracefile_size) { DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); } else { DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); } - if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { + if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) { stream->is_metadata = 1; } @@ -163,6 +162,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, * side of the relayd does not have the concept of session. */ lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); + stream->in_stream_ht = true; DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, stream->stream_handle); @@ -220,28 +220,43 @@ unlock: } /* - * Only called from destroy. No stream lock needed, since there is a - * single user at this point. This is ensured by having the refcount - * reaching 0. + * Stream must be protected by holding the stream lock or by virtue of being + * called from stream_destroy. */ static void stream_unpublish(struct relay_stream *stream) { - if (!stream->published) { - return; + if (stream->in_stream_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(relay_streams_ht, &iter); + assert(!ret); + stream->in_stream_ht = false; + } + if (stream->published) { + pthread_mutex_lock(&stream->trace->stream_list_lock); + cds_list_del_rcu(&stream->stream_node); + pthread_mutex_unlock(&stream->trace->stream_list_lock); + stream->published = false; } - pthread_mutex_lock(&stream->trace->stream_list_lock); - cds_list_del_rcu(&stream->stream_node); - pthread_mutex_unlock(&stream->trace->stream_list_lock); - - stream->published = false; } static void stream_destroy(struct relay_stream *stream) { if (stream->indexes_ht) { + /* + * Calling lttng_ht_destroy in call_rcu worker thread so + * we don't hold the RCU read-side lock while calling + * it. + */ lttng_ht_destroy(stream->indexes_ht); } + if (stream->tfa) { + tracefile_array_destroy(stream->tfa); + } free(stream->path_name); + free(stream->prev_path_name); free(stream->channel_name); free(stream); } @@ -257,17 +272,12 @@ static void stream_destroy_rcu(struct rcu_head *rcu_head) /* * No need to take stream->lock since this is only called on the final * stream_put which ensures that a single thread may act on the stream. - * - * At that point, the object is also protected by the reflock which - * guarantees that no other thread may share ownership of this stream. */ static void stream_release(struct urcu_ref *ref) { struct relay_stream *stream = caa_container_of(ref, struct relay_stream, ref); struct relay_session *session; - int ret; - struct lttng_ht_iter iter; session = stream->trace->session; @@ -281,19 +291,15 @@ static void stream_release(struct urcu_ref *ref) } pthread_mutex_unlock(&session->recv_list_lock); - iter.iter.node = &stream->node.node; - ret = lttng_ht_del(relay_streams_ht, &iter); - assert(!ret); - stream_unpublish(stream); if (stream->stream_fd) { stream_fd_put(stream->stream_fd); stream->stream_fd = NULL; } - if (stream->index_fd) { - stream_fd_put(stream->index_fd); - stream->index_fd = NULL; + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; } if (stream->trace) { ctf_trace_put(stream->trace); @@ -306,15 +312,7 @@ static void stream_release(struct urcu_ref *ref) void stream_put(struct relay_stream *stream) { DBG("stream put for stream id %" PRIu64, stream->stream_handle); - /* - * Ensure existence of stream->reflock for stream unlock. - */ rcu_read_lock(); - /* - * Stream reflock ensures that concurrent test and update of - * stream ref is atomic. - */ - pthread_mutex_lock(&stream->reflock); assert(stream->ref.refcount != 0); /* * Wait until we have processed all the stream packets before @@ -324,24 +322,127 @@ void stream_put(struct relay_stream *stream) stream->stream_handle, (int) stream->ref.refcount); urcu_ref_put(&stream->ref, stream_release); - pthread_mutex_unlock(&stream->reflock); rcu_read_unlock(); } -void stream_close(struct relay_stream *stream) +void try_stream_close(struct relay_stream *stream) { - DBG("closing stream %" PRIu64, stream->stream_handle); + bool session_aborted; + struct relay_session *session = stream->trace->session; + + DBG("Trying to close stream %" PRIu64, stream->stream_handle); + + pthread_mutex_lock(&session->lock); + session_aborted = session->aborted; + pthread_mutex_unlock(&session->lock); + pthread_mutex_lock(&stream->lock); + /* + * Can be called concurently by connection close and reception of last + * pending data. + */ + if (stream->closed) { + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle); + return; + } + + stream->close_requested = true; + + if (stream->last_net_seq_num == -1ULL) { + /* + * Handle connection close without explicit stream close + * command. + * + * We can be clever about indexes partially received in + * cases where we received the data socket part, but not + * the control socket part: since we're currently closing + * the stream on behalf of the control socket, we *know* + * there won't be any more control information for this + * socket. Therefore, we can destroy all indexes for + * which we have received only the file descriptor (from + * data socket). This takes care of consumerd crashes + * between sending the data and control information for + * a packet. Since those are sent in that order, we take + * care of consumerd crashes. + */ + DBG("relay_index_close_partial_fd"); + relay_index_close_partial_fd(stream); + /* + * Use the highest net_seq_num we currently have pending + * As end of stream indicator. Leave last_net_seq_num + * at -1ULL if we cannot find any index. + */ + stream->last_net_seq_num = relay_index_find_last(stream); + DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num); + /* Fall-through into the next check. */ + } + + if (stream->last_net_seq_num != -1ULL && + ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0 + && !session_aborted) { + /* + * Don't close since we still have data pending. This + * handles cases where an explicit close command has + * been received for this stream, and cases where the + * connection has been closed, and we are awaiting for + * index information from the data socket. It is + * therefore expected that all the index fd information + * we need has already been received on the control + * socket. Matching index information from data socket + * should be Expected Soon(TM). + * + * TODO: We should implement a timer to garbage collect + * streams after a timeout to be resilient against a + * consumerd implementation that would not match this + * expected behavior. + */ + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle); + return; + } + /* + * We received all the indexes we can expect. + */ + stream_unpublish(stream); + stream->closed = true; + /* Relay indexes are only used by the "consumer/sessiond" end. */ relay_index_close_all(stream); pthread_mutex_unlock(&stream->lock); + DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle); stream_put(stream); } +static void print_stream_indexes(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index, + index_n.node) { + DBG("index %p net_seq_num %" PRIu64 " refcount %ld" + " stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + index, + index->index_n.key, + stream->ref.refcount, + index->stream->stream_handle, + index->stream->trace->id, + index->stream->trace->session->id); + } + rcu_read_unlock(); +} + void print_relay_streams(void) { struct lttng_ht_iter iter; struct relay_stream *stream; + if (!relay_streams_ht) { + return; + } + rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, node.node) { @@ -355,6 +456,7 @@ void print_relay_streams(void) stream->stream_handle, stream->trace->id, stream->trace->session->id); + print_stream_indexes(stream); stream_put(stream); } rcu_read_unlock();