X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=0955e66fa4e384fdde2f70703a96e626b8ebfc8c;hb=84382d49b1e8a17037d0415d84f1c9e250163494;hp=83aa5986f7f663e122a92f58dd7895836d807149;hpb=50adc26400482c07210afcda8ef1d3322f75871d;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 83aa5986f..0955e66fa 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -409,6 +409,7 @@ static int send_sessiond_channel(int sock, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; + uint64_t net_seq_idx = -1ULL; assert(channel); assert(ctx); @@ -433,6 +434,9 @@ static int send_sessiond_channel(int sock, } ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; } + if (net_seq_idx == -1ULL) { + net_seq_idx = stream->net_seq_idx; + } } } @@ -625,45 +629,6 @@ error: rcu_read_unlock(); return ret; } -/* - * Close metadata stream wakeup_fd using the given key to retrieve the channel. - * RCU read side lock MUST be acquired before calling this function. - * - * NOTE: This function does NOT take any channel nor stream lock. - * - * Return 0 on success else LTTng error code. - */ -static int _close_metadata(struct lttng_consumer_channel *channel) -{ - int ret = LTTCOMM_CONSUMERD_SUCCESS; - - assert(channel); - assert(channel->type == CONSUMER_CHANNEL_TYPE_METADATA); - - if (channel->switch_timer_enabled == 1) { - DBG("Deleting timer on metadata channel"); - consumer_timer_switch_stop(channel); - } - - if (channel->metadata_stream) { - ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream); - if (ret < 0) { - ERR("UST consumer unable to close fd of metadata (ret: %d)", ret); - ret = LTTCOMM_CONSUMERD_ERROR_METADATA; - } - - if (channel->monitor) { - /* Close the read-side in consumer_del_metadata_stream */ - ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]); - if (ret < 0) { - PERROR("Close UST metadata write-side poll pipe"); - ret = LTTCOMM_CONSUMERD_ERROR_METADATA; - } - } - } - - return ret; -} /* * Close metadata stream wakeup_fd using the given key to retrieve the channel. @@ -698,7 +663,7 @@ static int close_metadata(uint64_t chan_key) goto error_unlock; } - ret = _close_metadata(channel); + lttng_ustconsumer_close_metadata(channel); error_unlock: pthread_mutex_unlock(&channel->lock); @@ -754,6 +719,12 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) ret = LTTCOMM_CONSUMERD_ERROR_METADATA; goto error; } + ret = consumer_send_relayd_streams_sent( + metadata->metadata_stream->net_seq_idx); + if (ret < 0) { + ret = LTTCOMM_CONSUMERD_RELAYD_FAIL; + goto error; + } } ret = send_streams_to_thread(metadata, ctx); @@ -778,8 +749,9 @@ error: * the stream is still in the local stream list of the channel. This call * will make sure to clean that list. */ - cds_list_del(&metadata->metadata_stream->send_node); consumer_stream_destroy(metadata->metadata_stream, NULL); + cds_list_del(&metadata->metadata_stream->send_node); + metadata->metadata_stream = NULL; error_no_stream: end: return ret; @@ -871,8 +843,8 @@ error_stream: * Clean up the stream completly because the next snapshot will use a new * metadata stream. */ - cds_list_del(&metadata_stream->send_node); consumer_stream_destroy(metadata_stream, NULL); + cds_list_del(&metadata_stream->send_node); metadata_channel->metadata_stream = NULL; error: @@ -939,6 +911,12 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path, stream->name, stream->key); } + if (relayd_id != -1ULL) { + ret = consumer_send_relayd_streams_sent(relayd_id); + if (ret < 0) { + goto error_unlock; + } + } ustctl_flush_buffer(stream->ustream, 1); @@ -1138,17 +1116,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - if (msg.cmd_type == LTTNG_CONSUMER_STOP) { - /* - * Notify the session daemon that the command is completed. - * - * On transport layer error, the function call will print an error - * message so handling the returned code is a bit useless since we - * return an error code anyway. - */ - (void) consumer_send_status_msg(sock, ret_code); - return -ENOENT; - } + /* deprecated */ + assert(msg.cmd_type != LTTNG_CONSUMER_STOP); health_code_update(); @@ -1451,8 +1420,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_find_channel(key); if (!channel) { - ERR("UST consumer push metadata %" PRIu64 " not found", key); - ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; + /* + * This is possible if the metadata creation on the consumer side + * is in flight vis-a-vis a concurrent push metadata from the + * session daemon. Simply return that the channel failed and the + * session daemon will handle that message correctly considering + * that this race is acceptable thus the DBG() statement here. + */ + DBG("UST consumer push metadata %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; goto end_msg_sessiond; } @@ -1471,7 +1447,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_poll_entry(); ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); - if (ret < 0) { + if (ret) { goto error_fatal; } @@ -1706,6 +1682,22 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) ustctl_destroy_stream(stream->ustream); } +int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream) +{ + assert(stream); + assert(stream->ustream); + + return ustctl_stream_get_wakeup_fd(stream->ustream); +} + +int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) +{ + assert(stream); + assert(stream->ustream); + + return ustctl_stream_close_wakeup_fd(stream->ustream); +} + /* * Populate index values of a UST stream. Values are set in big endian order. * @@ -2111,6 +2103,45 @@ end: return ret; } +/* + * Stop a given metadata channel timer if enabled and close the wait fd which + * is the poll pipe of the metadata stream. + * + * This MUST be called with the metadata channel acquired. + */ +void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata) +{ + int ret; + + assert(metadata); + assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA); + + DBG("Closing metadata channel key %" PRIu64, metadata->key); + + if (metadata->switch_timer_enabled == 1) { + consumer_timer_switch_stop(metadata); + } + + if (!metadata->metadata_stream) { + goto end; + } + + /* + * Closing write side so the thread monitoring the stream wakes up if any + * and clean the metadata stream. + */ + if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) { + ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]); + if (ret < 0) { + PERROR("closing metadata pipe write side"); + } + metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1; + } + +end: + return; +} + /* * Close every metadata stream wait fd of the metadata hash table. This * function MUST be used very carefully so not to run into a race between the @@ -2120,7 +2151,7 @@ end: * producer so calling this is safe because we are assured that no state change * can occur in the metadata thread for the streams in the hash table. */ -void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht) +void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht) { struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; @@ -2137,13 +2168,7 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht) health_code_update(); pthread_mutex_lock(&stream->chan->lock); - /* - * Whatever returned value, we must continue to try to close everything - * so ignore it. - */ - (void) _close_metadata(stream->chan); - DBG("Metadata wait fd %d and poll pipe fd %d closed", stream->wait_fd, - stream->ust_metadata_poll_pipe[1]); + lttng_ustconsumer_close_metadata(stream->chan); pthread_mutex_unlock(&stream->chan->lock); } @@ -2178,6 +2203,8 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, assert(channel); assert(channel->metadata_cache); + memset(&request, 0, sizeof(request)); + /* send the metadata request to sessiond */ switch (consumer_data.type) { case LTTNG_CONSUMER64_UST: