X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=b2434eb5a78948da66555e4ce48951c1b776fede;hb=5c786dedd0156b93984f89ba47ec841277ed7dae;hp=ff4ba25b9730f0a2896ef798e8869ca7c09aa1cc;hpb=f5a0c9cf497ccd3f438832072ff9c4c06264c16c;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index ff4ba25b9..b2434eb5a 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -152,7 +152,8 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, channel->session_id, cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (stream == NULL) { switch (alloc_ret) { case -ENOENT: @@ -208,42 +209,6 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, return ret; } -/* - * Search for a relayd object related to the stream. If found, send the stream - * to the relayd. - * - * On success, returns 0 else a negative value. - */ -static int send_stream_to_relayd(struct lttng_consumer_stream *stream) -{ - int ret = 0; - struct consumer_relayd_sock_pair *relayd; - - assert(stream); - - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - /* Add stream on the relayd */ - ret = relayd_add_stream(&relayd->control_sock, stream->name, - stream->chan->pathname, &stream->relayd_stream_id, - stream->chan->tracefile_size, - stream->chan->tracefile_count); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - goto error; - } - } else if (stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - stream->net_seq_idx); - ret = -1; - goto error; - } - -error: - return ret; -} - /* * Create streams for the given channel using liblttng-ust-ctl. * @@ -408,18 +373,20 @@ static int send_sessiond_channel(int sock, DBG("UST consumer sending channel %s to sessiond", channel->name); - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - /* Try to send the stream to the relayd if one is available. */ - ret = send_stream_to_relayd(stream); - if (ret < 0) { - /* - * Flag that the relayd was the problem here probably due to a - * communicaton error on the socket. - */ - if (relayd_error) { - *relayd_error = 1; + if (channel->relayd_id != (uint64_t) -1ULL) { + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + /* Try to send the stream to the relayd if one is available. */ + ret = consumer_send_relayd_stream(stream, stream->chan->pathname); + if (ret < 0) { + /* + * Flag that the relayd was the problem here probably due to a + * communicaton error on the socket. + */ + if (relayd_error) { + *relayd_error = 1; + } + ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; } - ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; } } @@ -559,6 +526,12 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, /* Remove node from the channel stream list. */ cds_list_del(&stream->send_node); + + /* + * From this point on, the stream's ownership has been moved away from + * the channel and becomes globally visible. + */ + stream->globally_visible = 1; } error: @@ -730,10 +703,13 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) } /* Send metadata stream to relayd if needed. */ - ret = send_stream_to_relayd(metadata->metadata_stream); - if (ret < 0) { - ret = LTTCOMM_CONSUMERD_ERROR_METADATA; - goto error; + if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(metadata->metadata_stream, + metadata->pathname); + if (ret < 0) { + ret = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto error; + } } ret = send_streams_to_thread(metadata, ctx); @@ -883,7 +859,7 @@ error: * Returns 0 on success, < 0 on error */ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, - struct lttng_consumer_local_data *ctx) + uint64_t max_stream_size, struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; @@ -954,6 +930,15 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, goto error_unlock; } + /* + * The original value is sent back if max stream size is larger than + * the possible size of the snapshot. Also, we asume that the session + * daemon should never send a maximum stream size that is lower than + * subbuffer size. + */ + consumed_pos = consumer_get_consumed_maxsize(consumed_pos, + produced_pos, max_stream_size); + while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; @@ -1108,12 +1093,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret != sizeof(msg)) { DBG("Consumer received unexpected message size %zd (expects %zu)", ret, sizeof(msg)); - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); /* * The ret value might 0 meaning an orderly shutdown but this is ok * since the caller handles this. */ if (ret > 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret = -1; } return ret; @@ -1449,6 +1434,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.max_stream_size, ctx); if (ret < 0) { ERR("Snapshot channel failed");