X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=29ef3a4ce426d1db8755796fdafffceed43525b6;hp=86428f0663141fa250905e269d7ceea22c5cb43d;hb=07b86b528dc279d59cdf16e6cb946c144fe773f2;hpb=2bba9e532ca1910822005ff7f67400a2e871467c diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 86428f066..29ef3a4ce 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -37,6 +37,7 @@ #include #include #include +#include #include "kernel-consumer.h" @@ -83,6 +84,321 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, return ret; } +/* + * Get the consumerd position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + int infd = stream->wait_fd; + + ret = kernctl_snapshot_get_consumed(infd, pos); + if (ret != 0) { + errno = -ret; + perror("kernctl_snapshot_get_consumed"); + } + + return ret; +} + +/* + * Find a relayd and send the stream + * + * Returns 0 on success, < 0 on error + */ +static +int send_relayd_stream(struct lttng_consumer_stream *stream, char *path) +{ + struct consumer_relayd_sock_pair *relayd; + int ret = 0; + char *stream_path; + + if (path != NULL) { + stream_path = path; + } else { + stream_path = stream->chan->pathname; + } + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, + stream->name, stream_path, + &stream->relayd_stream_id, + stream->chan->tracefile_size, + stream->chan->tracefile_count); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + uatomic_inc(&relayd->refcount); + } else if (stream->net_seq_idx != (uint64_t) -1ULL) { + ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", + stream->net_seq_idx); + ret = -1; + goto end; + } + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Find a relayd and close the stream + */ +static +void close_relayd_stream(struct lttng_consumer_stream *stream) +{ + struct consumer_relayd_sock_pair *relayd; + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + consumer_stream_relayd_close(stream, relayd); + } + rcu_read_unlock(); +} + +/* + * Take a snapshot of all the stream of a channel + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, + uint64_t relayd_id, struct lttng_consumer_local_data *ctx) +{ + int ret; + unsigned long consumed_pos, produced_pos; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + + DBG("Kernel consumer snapshot channel %lu", key); + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %lu", key); + ret = -1; + goto end; + } + + /* Splice is not supported yet for channel snapshot. */ + if (channel->output != CONSUMER_CHANNEL_MMAP) { + ERR("Unsupported output %d", channel->output); + ret = -1; + goto end; + } + + cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head, + no_monitor_node) { + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + + stream->net_seq_idx = relayd_id; + channel->relayd_id = relayd_id; + if (relayd_id != (uint64_t) -1ULL) { + ret = send_relayd_stream(stream, path); + if (ret < 0) { + ERR("sending stream to relayd"); + goto end_unlock; + } + DBG("Stream %s sent to the relayd", stream->name); + } else { + ret = utils_create_stream_file(path, stream->name, + stream->chan->tracefile_size, stream->tracefile_count_current, + stream->uid, stream->gid); + if (ret < 0) { + ERR("utils_create_stream_file"); + goto end_unlock; + } + + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + DBG("Kernel consumer snapshot stream %s/%s (%lu)", path, + stream->name, stream->key); + } + + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel metadata stream"); + goto end_unlock; + } + + ret = lttng_kconsumer_take_snapshot(stream); + if (ret < 0) { + ERR("Taking kernel snapshot"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret < 0) { + ERR("Produced kernel snapshot position"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumerd kernel snapshot position"); + goto end_unlock; + } + + if (stream->max_sb_size == 0) { + ret = kernctl_get_max_subbuf_size(stream->wait_fd, + &stream->max_sb_size); + if (ret < 0) { + ERR("Getting kernel max_sb_size"); + goto end_unlock; + } + } + + while (consumed_pos < produced_pos) { + ssize_t read_len; + unsigned long len, padded_len; + + DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); + + ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); + if (ret < 0) { + if (errno != EAGAIN) { + PERROR("kernctl_get_subbuf snapshot"); + goto end_unlock; + } + DBG("Kernel consumer get subbuf failed. Skipping it."); + consumed_pos += stream->max_sb_size; + continue; + } + + ret = kernctl_get_subbuf_size(stream->wait_fd, &len); + if (ret < 0) { + ERR("Snapshot kernctl_get_subbuf_size"); + goto end_unlock; + } + + ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); + if (ret < 0) { + ERR("Snapshot kernctl_get_padded_subbuf_size"); + goto end_unlock; + } + + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, + padded_len - len); + /* + * We write the padded len in local tracefiles but the + * data len when using a relay. + * Display the error but continue processing to try to + * release the subbuffer. + */ + if (relayd_id != (uint64_t) -1ULL) { + if (read_len != len) { + ERR("Error sending to the relay (ret: %zd != len: %lu)", + read_len, len); + } + } else { + if (read_len != padded_len) { + ERR("Error writing to tracefile (ret: %zd != len: %lu)", + read_len, padded_len); + } + } + + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf"); + goto end_unlock; + } + consumed_pos += stream->max_sb_size; + } + + if (relayd_id == (uint64_t) -1ULL) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot close out_fd"); + goto end_unlock; + } + stream->out_fd = -1; + } else { + close_relayd_stream(stream); + stream->net_seq_idx = (uint64_t) -1ULL; + } + pthread_mutex_unlock(&stream->lock); + } + + /* All good! */ + ret = 0; + goto end; + +end_unlock: + pthread_mutex_unlock(&stream->lock); +end: + rcu_read_unlock(); + return ret; +} + +/* + * Read the whole metadata available for a snapshot. + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, + struct lttng_consumer_local_data *ctx) +{ + struct lttng_consumer_channel *metadata_channel; + struct lttng_consumer_stream *metadata_stream; + int ret; + + DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", + key, path); + + rcu_read_lock(); + + metadata_channel = consumer_find_channel(key); + if (!metadata_channel) { + ERR("Snapshot kernel metadata channel not found for key %lu", key); + ret = -1; + goto end; + } + + metadata_stream = metadata_channel->metadata_stream; + assert(metadata_stream); + + ret = utils_create_stream_file(path, metadata_stream->name, + metadata_stream->chan->tracefile_size, + metadata_stream->tracefile_count_current, + metadata_stream->uid, metadata_stream->gid); + if (ret < 0) { + goto end; + } + metadata_stream->out_fd = ret; + + ret = 0; + while (ret >= 0) { + ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + if (ret < 0) { + if (ret != -EPERM) { + ERR("Kernel snapshot reading subbuffer"); + goto end; + } + /* "ret" is negative at this point so we will exit the loop. */ + continue; + } + } + + ret = 0; +end: + rcu_read_unlock(); + return ret; +} + /* * Receive command from session daemon and process it. * @@ -189,7 +505,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { int fd; struct lttng_pipe *stream_pipe; - struct consumer_relayd_sock_pair *relayd = NULL; struct lttng_consumer_stream *new_stream; struct lttng_consumer_channel *channel; int alloc_ret = 0; @@ -272,12 +587,28 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } new_stream->chan = channel; new_stream->wait_fd = fd; + switch (channel->output) { + case CONSUMER_CHANNEL_SPLICE: + new_stream->output = LTTNG_EVENT_SPLICE; + break; + case CONSUMER_CHANNEL_MMAP: + new_stream->output = LTTNG_EVENT_MMAP; + break; + default: + ERR("Stream output unknown %d", channel->output); + goto end_nosignal; + } /* * We've just assigned the channel to the stream so increment the - * refcount right now. + * refcount right now. We don't need to increment the refcount for + * streams in no monitor because we handle manually the cleanup of + * those. It is very important to make sure there is NO prior + * consumer_del_stream() calls or else the refcount will be unbalanced. */ - uatomic_inc(&new_stream->chan->refcount); + if (channel->monitor) { + uatomic_inc(&new_stream->chan->refcount); + } /* * The buffer flush is done on the session daemon side for the kernel @@ -288,24 +619,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ new_stream->hangup_flush_done = 0; - /* The stream is not metadata. Get relayd reference if exists. */ - relayd = consumer_find_relayd(new_stream->net_seq_idx); - if (relayd != NULL) { - /* Add stream on the relayd */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, - new_stream->name, new_stream->chan->pathname, - &new_stream->relayd_stream_id, - new_stream->chan->tracefile_size, - new_stream->chan->tracefile_count); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - consumer_del_stream(new_stream, NULL); - goto end_nosignal; - } - } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - new_stream->net_seq_idx); + ret = send_relayd_stream(new_stream, NULL); + if (ret < 0) { consumer_del_stream(new_stream, NULL); goto end_nosignal; } @@ -318,11 +633,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } + if (new_stream->metadata_flag) { + channel->metadata_stream = new_stream; + } + /* Do not monitor this stream. */ if (!channel->monitor) { DBG("Kernel consumer add stream %s in no monitor mode with" "relayd id %" PRIu64, new_stream->name, new_stream->relayd_stream_id); + cds_list_add(&new_stream->no_monitor_node, + &channel->stream_no_monitor_list.head); break; } @@ -411,6 +732,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_SNAPSHOT_CHANNEL: { + if (msg.u.snapshot_channel.metadata == 1) { + ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key, + msg.u.snapshot_channel.pathname, ctx); + if (ret < 0) { + ERR("Snapshot metadata failed"); + ret_code = LTTNG_ERR_KERN_META_FAIL; + } + } else { + ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, ctx); + if (ret < 0) { + ERR("Snapshot channel failed"); + ret_code = LTTNG_ERR_KERN_CHAN_FAIL; + } + } + ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -418,6 +756,39 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_DESTROY_CHANNEL: + { + uint64_t key = msg.u.destroy_channel.key; + struct lttng_consumer_channel *channel; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer destroy channel %" PRIu64 " not found", key); + ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + /* + * This command should ONLY be issued for channel with streams set in + * no monitor mode. + */ + assert(!channel->monitor); + + /* + * The refcount should ALWAYS be 0 in the case of a channel in no + * monitor mode. + */ + assert(!uatomic_sub_return(&channel->refcount, 1)); + + consumer_del_channel(channel); + + goto end_nosignal; + } default: goto end_nosignal; } @@ -474,8 +845,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } switch (stream->chan->output) { - case LTTNG_EVENT_SPLICE: - + case CONSUMER_CHANNEL_SPLICE: /* * XXX: The lttng-modules splice "actor" does not handle copying * partial pages hence only using the subbuffer size without the @@ -500,7 +870,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ret, subbuf_size); } break; - case LTTNG_EVENT_MMAP: + case CONSUMER_CHANNEL_MMAP: /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) {