X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=3755932ab59cf8ad177607b5ad64c95a9c76bcae;hb=9a2fcf78ea85b9ad74405db4605338896d70b46f;hp=ccf9b40386ab306b8130ea073b62d8a07bd343e7;hpb=fee30cf85f3b58b329d669db781cd7aac22f7785;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index ccf9b4038..3755932ab 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -630,10 +630,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -int consumer_add_data_stream(struct lttng_consumer_stream *stream) +void consumer_add_data_stream(struct lttng_consumer_stream *stream) { struct lttng_ht *ht = data_ht; - int ret = 0; assert(stream); assert(ht); @@ -683,8 +682,6 @@ int consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); - - return ret; } void consumer_del_data_stream(struct lttng_consumer_stream *stream) @@ -1077,7 +1074,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ static int update_poll_array(struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream, - struct lttng_ht *ht) + struct lttng_ht *ht, int *nb_inactive_fd) { int i = 0; struct lttng_ht_iter iter; @@ -1089,6 +1086,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, assert(local_stream); DBG("Updating poll fd array"); + *nb_inactive_fd = 0; rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { /* @@ -1099,9 +1097,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * just after the check. However, this is OK since the stream(s) will * be deleted once the thread is notified that the end point state has * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; continue; } /* @@ -2106,10 +2109,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) +void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { struct lttng_ht *ht = metadata_ht; - int ret = 0; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -2169,7 +2171,6 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&consumer_data.lock); - return ret; } /* @@ -2457,6 +2458,8 @@ void *consumer_thread_data_poll(void *data) struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; + /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ + int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -2513,7 +2516,7 @@ void *consumer_thread_data_poll(void *data) goto end; } ret = update_poll_array(ctx, &pollfd, local_stream, - data_ht); + data_ht, &nb_inactive_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2526,7 +2529,8 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); /* No FDs and consumer_quit, consumer_cleanup the thread */ - if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) { + if (nb_fd == 0 && nb_inactive_fd == 0 && + CMM_LOAD_SHARED(consumer_quit) == 1) { err = 0; /* All is OK */ goto end; } @@ -3360,7 +3364,7 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ -int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, + void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, @@ -3516,7 +3520,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, add_relayd(relayd); /* All good! */ - return 0; + return; error: if (consumer_send_status_msg(sock, ret_code) < 0) { @@ -3534,8 +3538,6 @@ error_nosignal: if (relayd_created) { free(relayd); } - - return ret; } /*