X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=0ed22ac313731c2ad4031a51db153942bf9745b6;hb=2ec8403bb9647a2ad22dcfc8bbc79db15b179783;hp=234944dad0abf6d6c6594fa68ea64721f1488d87;hpb=a23ca18428f7e2b8bece8d3f47a9c274cf030ec4;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 234944dad..0ed22ac31 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -320,6 +320,7 @@ static void free_relayd_rcu(struct rcu_head *head) (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + pthread_mutex_destroy(&relayd->ctrl_sock_mutex); free(relayd); } @@ -416,25 +417,24 @@ static void cleanup_relayd_ht(void) } /* - * Update the end point status of all streams having the given network sequence - * index (relayd index). + * Update the end point status of all streams having the given relayd id. * * It's atomically set without having the stream mutex locked which is fine * because we handle the write/read race with a pipe wakeup for each thread. */ -static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, +static void update_endpoint_status_by_netidx(uint64_t relayd_id, enum consumer_endpoint_status status) { struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; - DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); + DBG("Consumer set delete flag on stream by idx %" PRIu64, relayd_id); rcu_read_lock(); /* Let's begin with metadata */ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { - if (stream->net_seq_idx == net_seq_idx) { + if (stream->relayd_id == relayd_id) { uatomic_set(&stream->endpoint_status, status); DBG("Delete flag set to metadata stream %d", stream->wait_fd); } @@ -442,7 +442,7 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, /* Follow up by the data streams */ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { - if (stream->net_seq_idx == net_seq_idx) { + if (stream->relayd_id == relayd_id) { uatomic_set(&stream->endpoint_status, status); DBG("Delete flag set to data stream %d", stream->wait_fd); } @@ -458,17 +458,16 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, * If a local data context is available, notify the threads that the streams' * state have changed. */ -static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, - struct lttng_consumer_local_data *ctx) +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) { uint64_t netidx; assert(relayd); - DBG("Cleaning up relayd sockets"); + DBG("Cleaning up relayd object ID %"PRIu64, relayd->id); /* Save the net sequence index before destroying the object */ - netidx = relayd->net_seq_idx; + netidx = relayd->id; /* * Delete the relayd from the relayd hash table, close the sockets and free @@ -485,10 +484,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * memory barrier ordering the updates of the end point status from the * read of this status which happens AFTER receiving this notify. */ - if (ctx) { - notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); - } + notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe); + notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe); } /* @@ -567,7 +564,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->state = state; stream->uid = uid; stream->gid = gid; - stream->net_seq_idx = relayd_id; + stream->relayd_id = relayd_id; stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; @@ -606,7 +603,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64, stream->name, stream->key, channel_key, - stream->net_seq_idx, stream->session_id); + stream->relayd_id, stream->session_id); rcu_read_unlock(); return stream; @@ -699,7 +696,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd) assert(relayd); lttng_ht_lookup(consumer_data.relayd_ht, - &relayd->net_seq_idx, &iter); + &relayd->id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { goto end; @@ -714,12 +711,12 @@ end: * Allocate and return a consumer relayd socket. */ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( - uint64_t net_seq_idx) + uint64_t relayd_id) { struct consumer_relayd_sock_pair *obj = NULL; /* net sequence index of -1 is a failure */ - if (net_seq_idx == (uint64_t) -1ULL) { + if (relayd_id == (uint64_t) -1ULL) { goto error; } @@ -729,12 +726,12 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( goto error; } - obj->net_seq_idx = net_seq_idx; + obj->id = relayd_id; obj->refcount = 0; obj->destroy_flag = 0; obj->control_sock.sock.fd = -1; obj->data_sock.sock.fd = -1; - lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx); + lttng_ht_node_init_u64(&obj->node, obj->id); pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); error: @@ -782,12 +779,12 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, struct consumer_relayd_sock_pair *relayd; assert(stream); - assert(stream->net_seq_idx != -1ULL); + assert(stream->relayd_id != -1ULL); assert(path); /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -796,6 +793,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->chan->tracefile_size, stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); goto end; } @@ -803,13 +802,13 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->sent_to_relayd = 1; } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", - stream->key, stream->net_seq_idx); + stream->key, stream->relayd_id); ret = -1; goto end; } DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, - stream->name, stream->key, stream->net_seq_idx); + stream->name, stream->key, stream->relayd_id); end: rcu_read_unlock(); @@ -821,33 +820,35 @@ end: * * Returns 0 on success, < 0 on error */ -int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) +int consumer_send_relayd_streams_sent(uint64_t relayd_id) { int ret = 0; struct consumer_relayd_sock_pair *relayd; - assert(net_seq_idx != -1ULL); + assert(relayd_id != -1ULL); /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(net_seq_idx); + relayd = consumer_find_relayd(relayd_id); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_streams_sent(&relayd->control_sock); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); goto end; } } else { ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", - net_seq_idx); + relayd_id); ret = -1; goto end; } ret = 0; - DBG("All streams sent relayd id %" PRIu64, net_seq_idx); + DBG("All streams sent relayd id %" PRIu64, relayd_id); end: rcu_read_unlock(); @@ -863,7 +864,7 @@ void close_relayd_stream(struct lttng_consumer_stream *stream) /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd) { consumer_stream_relayd_close(stream, relayd); } @@ -1071,7 +1072,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ static int update_poll_array(struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream, - struct lttng_ht *ht) + struct lttng_ht *ht, int *nb_inactive_fd) { int i = 0; struct lttng_ht_iter iter; @@ -1083,6 +1084,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, assert(local_stream); DBG("Updating poll fd array"); + *nb_inactive_fd = 0; rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { /* @@ -1093,9 +1095,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * just after the check. However, this is OK since the stream(s) will * be deleted once the thread is notified that the end point state has * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; continue; } /* @@ -1517,8 +1524,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != (uint64_t) -1ULL) { - relayd = consumer_find_relayd(stream->net_seq_idx); + if (stream->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(stream->relayd_id); if (relayd == NULL) { ret = -EPIPE; goto end; @@ -1696,7 +1703,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); } end: @@ -1748,8 +1756,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != (uint64_t) -1ULL) { - relayd = consumer_find_relayd(stream->net_seq_idx); + if (stream->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(stream->relayd_id); if (relayd == NULL) { written = -ret; goto end; @@ -1922,7 +1930,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); /* Skip splice error so the consumer does not fail */ goto end; } @@ -2051,9 +2060,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Consumer delete metadata stream %d", stream->wait_fd); pthread_mutex_lock(&consumer_data.lock); - pthread_mutex_lock(&stream->chan->metadata_cache->lock); pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); + if (stream->chan->metadata_cache) { + /* Only applicable to userspace consumers. */ + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + } /* Remove any reference to that stream. */ consumer_stream_delete(stream, ht); @@ -2077,9 +2089,11 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, */ stream->chan->metadata_stream = NULL; + if (stream->chan->metadata_cache) { + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); + } pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); - pthread_mutex_unlock(&stream->chan->metadata_cache->lock); pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { @@ -2140,7 +2154,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) lttng_ht_add_unique_u64(ht, &stream->node); - lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht, + lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht, &stream->node_channel_id); /* @@ -2444,6 +2458,8 @@ void *consumer_thread_data_poll(void *data) struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; + /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ + int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -2500,7 +2516,7 @@ void *consumer_thread_data_poll(void *data) goto end; } ret = update_poll_array(ctx, &pollfd, local_stream, - data_ht); + data_ht, &nb_inactive_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2513,7 +2529,7 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); /* No FDs and consumer_quit, consumer_cleanup the thread */ - if (nb_fd == 0 && consumer_quit == 1) { + if (nb_fd == 0 && consumer_quit == 1 && nb_inactive_fd == 0) { err = 0; /* All is OK */ goto end; } @@ -3338,7 +3354,7 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ -int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, + void consumer_add_relayd_socket(uint64_t relayd_id, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, @@ -3351,14 +3367,14 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, assert(ctx); assert(relayd_sock); - DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); + DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", relayd_id); /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(net_seq_idx); + relayd = consumer_find_relayd(relayd_id); if (relayd == NULL) { assert(sock_type == LTTNG_STREAM_CONTROL); /* Not found. Allocate one. */ - relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); + relayd = consumer_allocate_relayd_sock_pair(relayd_id); if (relayd == NULL) { ret = -ENOMEM; ret_code = LTTCOMM_CONSUMERD_ENOMEM; @@ -3481,7 +3497,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", - relayd->net_seq_idx, fd); + relayd->id, fd); /* We successfully added the socket. Send status back. */ ret = consumer_send_status_msg(sock, ret_code); @@ -3495,10 +3511,11 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. */ + relayd->ctx = ctx; add_relayd(relayd); /* All good! */ - return 0; + return; error: if (consumer_send_status_msg(sock, ret_code) < 0) { @@ -3516,36 +3533,6 @@ error_nosignal: if (relayd_created) { free(relayd); } - - return ret; -} - -/* - * Try to lock the stream mutex. - * - * On success, 1 is returned else 0 indicating that the mutex is NOT lock. - */ -static int stream_try_lock(struct lttng_consumer_stream *stream) -{ - int ret; - - assert(stream); - - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret) { - /* For both EBUSY and EINVAL error, the mutex is NOT locked. */ - ret = 0; - goto end; - } - - ret = 1; - -end: - return ret; } /* @@ -3559,7 +3546,7 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) struct lttng_ht_iter iter; struct consumer_relayd_sock_pair *relayd = NULL; - /* Iterate over all relayd since they are indexed by net_seq_idx. */ + /* Iterate over all relayd since they are indexed by relayd_id. */ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { /* @@ -3614,28 +3601,11 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - relayd = find_relayd_by_session_id(id); - if (relayd) { - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_begin_data_pending(&relayd->control_sock, - relayd->relayd_session_id); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - /* Communication error thus the relayd so no data pending. */ - goto data_not_pending; - } - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, &iter.iter, stream, node_session_id.node) { - /* If this call fails, the stream is being used hence data pending. */ - ret = stream_try_lock(stream); - if (!ret) { - goto data_pending; - } + pthread_mutex_lock(&stream->lock); /* * A removed node from the hash table indicates that the stream has @@ -3648,14 +3618,35 @@ int consumer_data_pending(uint64_t id) /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) { + DBG("Data is pending locally on stream %" PRIu64, stream->key); pthread_mutex_unlock(&stream->lock); goto data_pending; } } - /* Relayd check */ - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + } + + relayd = find_relayd_by_session_id(id); + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + if (ret < 0) { + /* Communication error thus the relayd so no data pending. */ + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); + goto data_not_pending; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3664,27 +3655,29 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { - pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_pending; } + if (ret < 0) { + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + goto data_not_pending; + } } - pthread_mutex_unlock(&stream->lock); - } - if (relayd) { - unsigned int is_data_inflight = 0; - - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Send end command for data pending. */ ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } if (is_data_inflight) { + DBG("Data is in flight on relayd %" PRIu64, relayd->id); goto data_pending; } }