X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=4c93aa7c2849d7771a8d9ab29069ac2fcf42684c;hb=ca6b395f1a6a30bc855fa566cae98ecde5950dfe;hp=f318af153ca3aca1dcdd1573590d7447de61490c;hpb=10a5031171c7bca5b4498c871b119e5a88b6a3fb;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index f318af153..4c93aa7c2 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -152,7 +152,8 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, channel->session_id, cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (stream == NULL) { switch (alloc_ret) { case -ENOENT: @@ -208,42 +209,6 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, return ret; } -/* - * Search for a relayd object related to the stream. If found, send the stream - * to the relayd. - * - * On success, returns 0 else a negative value. - */ -static int send_stream_to_relayd(struct lttng_consumer_stream *stream) -{ - int ret = 0; - struct consumer_relayd_sock_pair *relayd; - - assert(stream); - - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - /* Add stream on the relayd */ - ret = relayd_add_stream(&relayd->control_sock, stream->name, - stream->chan->pathname, &stream->relayd_stream_id, - stream->chan->tracefile_size, - stream->chan->tracefile_count); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - goto error; - } - } else if (stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - stream->net_seq_idx); - ret = -1; - goto error; - } - -error: - return ret; -} - /* * Create streams for the given channel using liblttng-ust-ctl. * @@ -408,18 +373,20 @@ static int send_sessiond_channel(int sock, DBG("UST consumer sending channel %s to sessiond", channel->name); - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - /* Try to send the stream to the relayd if one is available. */ - ret = send_stream_to_relayd(stream); - if (ret < 0) { - /* - * Flag that the relayd was the problem here probably due to a - * communicaton error on the socket. - */ - if (relayd_error) { - *relayd_error = 1; + if (channel->relayd_id != (uint64_t) -1ULL) { + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + /* Try to send the stream to the relayd if one is available. */ + ret = consumer_send_relayd_stream(stream, stream->chan->pathname); + if (ret < 0) { + /* + * Flag that the relayd was the problem here probably due to a + * communicaton error on the socket. + */ + if (relayd_error) { + *relayd_error = 1; + } + ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; } - ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; } } @@ -559,6 +526,12 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, /* Remove node from the channel stream list. */ cds_list_del(&stream->send_node); + + /* + * From this point on, the stream's ownership has been moved away from + * the channel and becomes globally visible. + */ + stream->globally_visible = 1; } error: @@ -665,6 +638,7 @@ static int close_metadata(uint64_t chan_key) } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); if (cds_lfht_is_node_deleted(&channel->node.node)) { goto error_unlock; @@ -685,6 +659,7 @@ static int close_metadata(uint64_t chan_key) } error_unlock: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); error: return ret; @@ -726,14 +701,17 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) if (cds_list_empty(&metadata->streams.head)) { ERR("Metadata channel key %" PRIu64 ", no stream available.", key); ret = LTTCOMM_CONSUMERD_ERROR_METADATA; - goto error; + goto error_no_stream; } /* Send metadata stream to relayd if needed. */ - ret = send_stream_to_relayd(metadata->metadata_stream); - if (ret < 0) { - ret = LTTCOMM_CONSUMERD_ERROR_METADATA; - goto error; + if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(metadata->metadata_stream, + metadata->pathname); + if (ret < 0) { + ret = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto error; + } } ret = send_streams_to_thread(metadata, ctx); @@ -758,7 +736,9 @@ error: * the stream is still in the local stream list of the channel. This call * will make sure to clean that list. */ - consumer_del_channel(metadata); + cds_list_del(&metadata->metadata_stream->send_node); + consumer_stream_destroy(metadata->metadata_stream, NULL); +error_no_stream: end: return ret; } @@ -881,7 +861,7 @@ error: * Returns 0 on success, < 0 on error */ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, - struct lttng_consumer_local_data *ctx) + uint64_t max_stream_size, struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; @@ -952,6 +932,15 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, goto error_unlock; } + /* + * The original value is sent back if max stream size is larger than + * the possible size of the snapshot. Also, we asume that the session + * daemon should never send a maximum stream size that is lower than + * subbuffer size. + */ + consumed_pos = consumer_get_consumed_maxsize(consumed_pos, + produced_pos, max_stream_size); + while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; @@ -1060,7 +1049,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * and ultimately try to get rid of this global consumer data lock. */ pthread_mutex_lock(&consumer_data.lock); - + pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->metadata_cache->lock); ret = consumer_metadata_cache_write(channel, offset, len, metadata_str); if (ret < 0) { @@ -1072,10 +1061,12 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * waiting for the metadata cache to be flushed. */ pthread_mutex_unlock(&channel->metadata_cache->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); goto end_free; } pthread_mutex_unlock(&channel->metadata_cache->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); while (consumer_metadata_cache_flushed(channel, offset + len)) { @@ -1106,12 +1097,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret != sizeof(msg)) { DBG("Consumer received unexpected message size %zd (expects %zu)", ret, sizeof(msg)); - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); /* * The ret value might 0 meaning an orderly shutdown but this is ok * since the caller handles this. */ if (ret > 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret = -1; } return ret; @@ -1447,6 +1438,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.max_stream_size, ctx); if (ret < 0) { ERR("Snapshot channel failed"); @@ -1742,6 +1734,11 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) DBG("UST consumer checking data pending"); + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; + goto end; + } + ret = ustctl_get_next_subbuf(stream->ustream); if (ret == 0) { /* There is still data so let's put back this subbuffer. */