X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=b449c1637875ec3d9fd480e49982975ca17ab193;hb=a1ae2ea59428174575b7328b1062a6248d636b72;hp=a8c369904ec35688a3467ab383c9c8f05cc23e7e;hpb=2527bf859786192dd530b408717960495f48d208;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index a8c369904..b449c1637 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -630,10 +630,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -int consumer_add_data_stream(struct lttng_consumer_stream *stream) +void consumer_add_data_stream(struct lttng_consumer_stream *stream) { struct lttng_ht *ht = data_ht; - int ret = 0; assert(stream); assert(ht); @@ -683,8 +682,6 @@ int consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); - - return ret; } void consumer_del_data_stream(struct lttng_consumer_stream *stream) @@ -1077,7 +1074,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ static int update_poll_array(struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream, - struct lttng_ht *ht) + struct lttng_ht *ht, int *nb_inactive_fd) { int i = 0; struct lttng_ht_iter iter; @@ -1089,6 +1086,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, assert(local_stream); DBG("Updating poll fd array"); + *nb_inactive_fd = 0; rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { /* @@ -1099,9 +1097,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * just after the check. However, this is OK since the stream(s) will * be deleted once the thread is notified that the end point state has * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; continue; } /* @@ -2106,10 +2109,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) +void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { struct lttng_ht *ht = metadata_ht; - int ret = 0; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -2153,7 +2155,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) lttng_ht_add_unique_u64(ht, &stream->node); - lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht, + lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht, &stream->node_channel_id); /* @@ -2169,7 +2171,6 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&consumer_data.lock); - return ret; } /* @@ -2457,6 +2458,8 @@ void *consumer_thread_data_poll(void *data) struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; + /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ + int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -2513,7 +2516,7 @@ void *consumer_thread_data_poll(void *data) goto end; } ret = update_poll_array(ctx, &pollfd, local_stream, - data_ht); + data_ht, &nb_inactive_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2526,7 +2529,8 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); /* No FDs and consumer_quit, consumer_cleanup the thread */ - if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) { + if (nb_fd == 0 && nb_inactive_fd == 0 && + CMM_LOAD_SHARED(consumer_quit) == 1) { err = 0; /* All is OK */ goto end; } @@ -3779,3 +3783,51 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, } return start_pos; } + +static +int mkdir_local(const char *path, uid_t uid, gid_t gid) +{ + int ret; + + ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, uid, gid); + if (ret < 0) { + /* utils_mkdir_recursive logs an error. */ + goto end; + } + + ret = 0; +end: + return ret; +} + +static +int mkdir_relay(const char *path, uint64_t relayd_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_mkdir(&relayd->control_sock, path); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + +end: + return ret; + +} + +int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid, + uint64_t relayd_id) +{ + if (relayd_id != -1ULL) { + return mkdir_relay(path, relayd_id); + } else { + return mkdir_local(path, uid, gid); + } +}