X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=111d655881c7f35c915e83da4239d381df9e4970;hb=d6ef77b34f2ba071e280bc8c25936e95bf1094d9;hp=f23fc9c5711f18101e1b7b3fb6cd7245ea378cee;hpb=13886d2d433762b7cf54f8e812f747ec2829de57;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index f23fc9c57..111d65588 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -16,7 +16,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -28,15 +28,25 @@ #include #include #include +#include +#include #include #include #include #include #include +#include #include #include #include +#include +#include +#include +#include +#include +#include +#include #include "kernel-consumer.h" @@ -55,9 +65,12 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) int infd = stream->wait_fd; ret = kernctl_snapshot(infd); - if (ret != 0) { - errno = -ret; - perror("Getting sub-buffer snapshot."); + /* + * -EAGAIN is not an error, it just means that there is no data to + * be read. + */ + if (ret != 0 && ret != -EAGAIN) { + PERROR("Getting sub-buffer snapshot."); } return ret; @@ -76,36 +89,401 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, ret = kernctl_snapshot_get_produced(infd, pos); if (ret != 0) { - errno = -ret; - perror("kernctl_snapshot_get_produced"); + PERROR("kernctl_snapshot_get_produced"); + } + + return ret; +} + +/* + * Get the consumerd position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + int infd = stream->wait_fd; + + ret = kernctl_snapshot_get_consumed(infd, pos); + if (ret != 0) { + PERROR("kernctl_snapshot_get_consumed"); + } + + return ret; +} + +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base = stream->mmap_base; + + ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret < 0) { + PERROR("Failed to get mmap read offset"); + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; +} + +/* + * Take a snapshot of all the stream of a channel + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, + uint64_t relayd_id, uint64_t nb_packets_per_stream, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + + DBG("Kernel consumer snapshot channel %" PRIu64, key); + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + + /* Splice is not supported yet for channel snapshot. */ + if (channel->output != CONSUMER_CHANNEL_MMAP) { + ERR("Unsupported output %d", channel->output); + ret = -1; + goto end; + } + + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + unsigned long consumed_pos, produced_pos; + + health_code_update(); + + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + + /* + * Assign the received relayd ID so we can use it for streaming. The streams + * are not visible to anyone so this is OK to change it. + */ + stream->relayd_id = relayd_id; + channel->relayd_id = relayd_id; + if (relayd_id != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(stream, path); + if (ret < 0) { + ERR("sending stream to relayd"); + goto end_unlock; + } + } else { + ret = utils_create_stream_file(path, stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, + stream->uid, stream->gid, NULL); + if (ret < 0) { + ERR("utils_create_stream_file"); + goto end_unlock; + } + + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")", + path, stream->name, stream->key); + } + if (relayd_id != -1ULL) { + ret = consumer_send_relayd_streams_sent(relayd_id); + if (ret < 0) { + ERR("sending streams sent to relayd"); + goto end_unlock; + } + channel->streams_sent_to_relayd = true; + } + + ret = kernctl_buffer_flush_empty(stream->wait_fd); + if (ret < 0) { + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect + * for stream intersection, but required as a + * fall-back when "flush_empty" is not + * implemented by lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end_unlock; + } + goto end_unlock; + } + + ret = lttng_kconsumer_take_snapshot(stream); + if (ret < 0) { + ERR("Taking kernel snapshot"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret < 0) { + ERR("Produced kernel snapshot position"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumerd kernel snapshot position"); + goto end_unlock; + } + + if (stream->max_sb_size == 0) { + ret = kernctl_get_max_subbuf_size(stream->wait_fd, + &stream->max_sb_size); + if (ret < 0) { + ERR("Getting kernel max_sb_size"); + goto end_unlock; + } + } + + consumed_pos = consumer_get_consume_start_pos(consumed_pos, + produced_pos, nb_packets_per_stream, + stream->max_sb_size); + + while (consumed_pos < produced_pos) { + ssize_t read_len; + unsigned long len, padded_len; + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; + + health_code_update(); + DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); + + ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); + if (ret < 0) { + if (ret != -EAGAIN) { + PERROR("kernctl_get_subbuf snapshot"); + goto end_unlock; + } + DBG("Kernel consumer get subbuf failed. Skipping it."); + consumed_pos += stream->max_sb_size; + stream->chan->lost_packets++; + continue; + } + + ret = kernctl_get_subbuf_size(stream->wait_fd, &len); + if (ret < 0) { + ERR("Snapshot kernctl_get_subbuf_size"); + goto error_put_subbuf; + } + + ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); + if (ret < 0) { + ERR("Snapshot kernctl_get_padded_subbuf_size"); + goto error_put_subbuf; + } + + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); + read_len = lttng_consumer_on_read_subbuffer_mmap( + stream, &subbuf_view, + padded_len - len); + /* + * We write the padded len in local tracefiles but the data len + * when using a relay. Display the error but continue processing + * to try to release the subbuffer. + */ + if (relayd_id != (uint64_t) -1ULL) { + if (read_len != len) { + ERR("Error sending to the relay (ret: %zd != len: %lu)", + read_len, len); + } + } else { + if (read_len != padded_len) { + ERR("Error writing to tracefile (ret: %zd != len: %lu)", + read_len, padded_len); + } + } + + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf"); + goto end_unlock; + } + consumed_pos += stream->max_sb_size; + } + + if (relayd_id == (uint64_t) -1ULL) { + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot close out_fd"); + goto end_unlock; + } + stream->out_fd = -1; + } + } else { + close_relayd_stream(stream); + stream->relayd_id = (uint64_t) -1ULL; + } + pthread_mutex_unlock(&stream->lock); + } + + /* All good! */ + ret = 0; + goto end; + +error_put_subbuf: + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf error path"); + } +end_unlock: + pthread_mutex_unlock(&stream->lock); +end: + rcu_read_unlock(); + return ret; +} + +/* + * Read the whole metadata available for a snapshot. + * + * Returns 0 on success, < 0 on error + */ +static int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, + uint64_t relayd_id, struct lttng_consumer_local_data *ctx) +{ + int ret, use_relayd = 0; + ssize_t ret_read; + struct lttng_consumer_channel *metadata_channel; + struct lttng_consumer_stream *metadata_stream; + + assert(ctx); + + DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", + key, path); + + rcu_read_lock(); + + metadata_channel = consumer_find_channel(key); + if (!metadata_channel) { + ERR("Kernel snapshot metadata not found for key %" PRIu64, key); + ret = -1; + goto error_no_channel; + } + + metadata_stream = metadata_channel->metadata_stream; + assert(metadata_stream); + pthread_mutex_lock(&metadata_stream->lock); + + /* Flag once that we have a valid relayd for the stream. */ + if (relayd_id != (uint64_t) -1ULL) { + use_relayd = 1; + } + + if (use_relayd) { + ret = consumer_send_relayd_stream(metadata_stream, path); + if (ret < 0) { + goto error_snapshot; + } + } else { + ret = utils_create_stream_file(path, metadata_stream->name, + metadata_stream->chan->tracefile_size, + metadata_stream->tracefile_count_current, + metadata_stream->uid, metadata_stream->gid, NULL); + if (ret < 0) { + goto error_snapshot; + } + metadata_stream->out_fd = ret; + } + + do { + health_code_update(); + + ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); + if (ret_read < 0) { + if (ret_read != -EAGAIN) { + ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", + ret_read); + ret = ret_read; + goto error_snapshot; + } + /* ret_read is negative at this point so we will exit the loop. */ + continue; + } + } while (ret_read >= 0); + + if (use_relayd) { + close_relayd_stream(metadata_stream); + metadata_stream->relayd_id = (uint64_t) -1ULL; + } else { + if (metadata_stream->out_fd >= 0) { + ret = close(metadata_stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot metadata close out_fd"); + /* + * Don't go on error here since the snapshot was successful at this + * point but somehow the close failed. + */ + } + metadata_stream->out_fd = -1; + } } + ret = 0; +error_snapshot: + pthread_mutex_unlock(&metadata_stream->lock); + cds_list_del(&metadata_stream->send_node); + consumer_stream_destroy(metadata_stream, NULL); + metadata_channel->metadata_stream = NULL; +error_no_channel: + rcu_read_unlock(); return ret; } +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { ssize_t ret; - enum lttng_error_code ret_code = LTTNG_OK; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttcomm_consumer_msg msg; + health_code_update(); + ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + if (ret > 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + ret = -1; + } return ret; } - if (msg.cmd_type == LTTNG_CONSUMER_STOP) { - /* - * Notify the session daemon that the command is completed. - * - * On transport layer error, the function call will print an error - * message so handling the returned code is a bit useless since we - * return an error code anyway. - */ - (void) consumer_send_status_msg(sock, ret_code); - return -ENOENT; - } + + health_code_update(); + + /* Deprecated command */ + assert(msg.cmd_type != LTTNG_CONSUMER_STOP); + + health_code_update(); /* relayd needs RCU read-side protection */ rcu_read_lock(); @@ -114,34 +492,55 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { /* Session daemon status message are handled in the following call. */ - ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; + int ret_recv; + + health_code_update(); /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_fatal; } + health_code_update(); + DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, - msg.u.channel.tracefile_count); + msg.u.channel.tracefile_count, 0, + msg.u.channel.monitor, + msg.u.channel.live_timer_interval, + msg.u.channel.is_live, + NULL, NULL); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams; + switch (msg.u.channel.output) { + case LTTNG_EVENT_SPLICE: + new_channel->output = CONSUMER_CHANNEL_SPLICE; + break; + case LTTNG_EVENT_MMAP: + new_channel->output = CONSUMER_CHANNEL_MMAP; + break; + default: + ERR("Channel output unknown %d", msg.u.channel.output); + goto end_nosignal; + } /* Translate and save channel type. */ switch (msg.u.channel.type) { @@ -154,22 +553,40 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; }; + health_code_update(); + if (ctx->on_recv_channel != NULL) { - ret = ctx->on_recv_channel(new_channel); - if (ret == 0) { - consumer_add_channel(new_channel, ctx); - } else if (ret < 0) { + ret_recv = ctx->on_recv_channel(new_channel); + if (ret_recv == 0) { + ret = consumer_add_channel(new_channel, ctx); + } else if (ret_recv < 0) { goto end_nosignal; } } else { - consumer_add_channel(new_channel, ctx); + ret = consumer_add_channel(new_channel, ctx); + } + if (CONSUMER_CHANNEL_TYPE_DATA) { + consumer_timer_live_start(new_channel, + msg.u.channel.live_timer_interval); + } + + health_code_update(); + + /* If we received an error in add_channel, we need to report it. */ + if (ret < 0) { + ret = consumer_send_status_msg(sock, ret); + if (ret < 0) { + goto error_fatal; + } + goto end_nosignal; } + goto end_nosignal; } case LTTNG_CONSUMER_ADD_STREAM: { - int fd, stream_pipe; - struct consumer_relayd_sock_pair *relayd = NULL; + int fd; + struct lttng_pipe *stream_pipe; struct lttng_consumer_stream *new_stream; struct lttng_consumer_channel *channel; int alloc_ret = 0; @@ -185,25 +602,35 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * happens while tearing down. */ ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key); - ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; } + health_code_update(); + /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { - /* - * Somehow, the session daemon is not responding anymore or the - * channel was not found. - */ + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error_fatal; + } + + health_code_update(); + + if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) { + /* Channel was not found. */ goto end_nosignal; } - /* block */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { - rcu_read_unlock(); - return -EINTR; + /* Blocking call */ + health_poll_entry(); + ret = lttng_consumer_poll_socket(consumer_sockpoll); + health_poll_exit(); + if (ret) { + goto error_fatal; } + health_code_update(); + /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { @@ -212,6 +639,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + health_code_update(); + /* * Send status code to session daemon only if the recv works. If the * above recv() failed, the session daemon is notified through the @@ -223,7 +652,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - new_stream = consumer_allocate_stream(channel->key, + health_code_update(); + + pthread_mutex_lock(&channel->lock); + new_stream = consumer_stream_create( + channel, + channel->key, fd, LTTNG_CONSUMER_ACTIVE_STREAM, channel->name, @@ -233,7 +667,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel->session_id, msg.u.stream.cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: @@ -242,10 +677,29 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); break; } + pthread_mutex_unlock(&channel->lock); goto end_nosignal; } - new_stream->chan = channel; + new_stream->wait_fd = fd; + ret = kernctl_get_max_subbuf_size(new_stream->wait_fd, + &new_stream->max_sb_size); + if (ret < 0) { + pthread_mutex_unlock(&channel->lock); + ERR("Failed to get kernel maximal subbuffer size"); + goto end_nosignal; + } + + /* + * We've just assigned the channel to the stream so increment the + * refcount right now. We don't need to increment the refcount for + * streams in no monitor because we handle manually the cleanup of + * those. It is very important to make sure there is NO prior + * consumer_del_stream() calls or else the refcount will be unbalanced. + */ + if (channel->monitor) { + uatomic_inc(&new_stream->chan->refcount); + } /* * The buffer flush is done on the session daemon side for the kernel @@ -256,51 +710,94 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ new_stream->hangup_flush_done = 0; - /* The stream is not metadata. Get relayd reference if exists. */ - relayd = consumer_find_relayd(new_stream->net_seq_idx); - if (relayd != NULL) { - /* Add stream on the relayd */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, - new_stream->name, new_stream->chan->pathname, - &new_stream->relayd_stream_id, - new_stream->chan->tracefile_size, - new_stream->chan->tracefile_count); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + health_code_update(); + + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); if (ret < 0) { - consumer_del_stream(new_stream, NULL); + consumer_stream_free(new_stream); goto end_nosignal; } - } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - new_stream->net_seq_idx); - consumer_del_stream(new_stream, NULL); - goto end_nosignal; } - if (ctx->on_recv_stream) { - ret = ctx->on_recv_stream(new_stream); + health_code_update(); + + if (new_stream->metadata_flag) { + channel->metadata_stream = new_stream; + } + + /* Do not monitor this stream. */ + if (!channel->monitor) { + DBG("Kernel consumer add stream %s in no monitor mode with " + "relayd id %" PRIu64, new_stream->name, + new_stream->relayd_id); + cds_list_add(&new_stream->send_node, &channel->streams.head); + pthread_mutex_unlock(&channel->lock); + break; + } + + /* Send stream to relayd if the stream has an ID. */ + if (new_stream->relayd_id != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(new_stream, + new_stream->chan->pathname); if (ret < 0) { - consumer_del_stream(new_stream, NULL); + pthread_mutex_unlock(&channel->lock); + consumer_stream_free(new_stream); goto end_nosignal; } + + /* + * If adding an extra stream to an already + * existing channel (e.g. cpu hotplug), we need + * to send the "streams_sent" command to relayd. + */ + if (channel->streams_sent_to_relayd) { + ret = consumer_send_relayd_streams_sent( + new_stream->relayd_id); + if (ret < 0) { + pthread_mutex_unlock(&channel->lock); + goto end_nosignal; + } + } } + pthread_mutex_unlock(&channel->lock); /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe); + ret = consumer_add_metadata_stream(new_stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } + stream_pipe = ctx->consumer_metadata_pipe; } else { - stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe); + ret = consumer_add_data_stream(new_stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } + stream_pipe = ctx->consumer_data_pipe; } - do { - ret = write(stream_pipe, &new_stream, sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); + /* Vitible to other threads */ + new_stream->globally_visible = 1; + + health_code_update(); + + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { - PERROR("Consumer write %s stream to pipe %d", + ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", - stream_pipe); - consumer_del_stream(new_stream, NULL); + lttng_pipe_get_writefd(stream_pipe)); + if (new_stream->metadata_flag) { + consumer_del_stream_for_metadata(new_stream); + } else { + consumer_del_stream_for_data(new_stream); + } goto end_nosignal; } @@ -308,6 +805,58 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->name, fd, new_stream->relayd_stream_id); break; } + case LTTNG_CONSUMER_STREAMS_SENT: + { + struct lttng_consumer_channel *channel; + + /* + * Get stream's channel reference. Needed when adding the stream to the + * global hash table. + */ + channel = consumer_find_channel(msg.u.sent_streams.channel_key); + if (!channel) { + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + ERR("Unable to find channel key %" PRIu64, + msg.u.sent_streams.channel_key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + /* + * Send status code to session daemon. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + health_code_update(); + + /* + * We should not send this message if we don't monitor the + * streams in this channel. + */ + if (!channel->monitor) { + break; + } + + health_code_update(); + /* Send stream to relayd if the stream has an ID. */ + if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) { + ret = consumer_send_relayd_streams_sent( + msg.u.sent_streams.net_seq_idx); + if (ret < 0) { + goto end_nosignal; + } + channel->streams_sent_to_relayd = true; + } + break; + } case LTTNG_CONSUMER_UPDATE_STREAM: { rcu_read_unlock(); @@ -324,7 +873,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, relayd = consumer_find_relayd(index); if (relayd == NULL) { DBG("Unable to find relayd %" PRIu64, index); - ret_code = LTTNG_ERR_NO_CONSUMER; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } /* @@ -341,10 +890,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_flag_relayd_for_destroy(relayd); } + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_fatal; } goto end_nosignal; @@ -358,10 +909,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_data_pending(id); + health_code_update(); + /* Send back returned value to session daemon */ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); if (ret < 0) { PERROR("send data pending ret code"); + goto error_fatal; } /* @@ -370,6 +924,139 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ break; } + case LTTNG_CONSUMER_SNAPSHOT_CHANNEL: + { + if (msg.u.snapshot_channel.metadata == 1) { + ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, ctx); + if (ret < 0) { + ERR("Snapshot metadata failed"); + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + } + } else { + ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.nb_packets_per_stream, + ctx); + if (ret < 0) { + ERR("Snapshot channel failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_DESTROY_CHANNEL: + { + uint64_t key = msg.u.destroy_channel.key; + struct lttng_consumer_channel *channel; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer destroy channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + health_code_update(); + + /* Stop right now if no channel was found. */ + if (!channel) { + goto end_nosignal; + } + + /* + * This command should ONLY be issued for channel with streams set in + * no monitor mode. + */ + assert(!channel->monitor); + + /* + * The refcount should ALWAYS be 0 in the case of a channel in no + * monitor mode. + */ + assert(!uatomic_sub_return(&channel->refcount, 1)); + + consumer_del_channel(channel); + + goto end_nosignal; + } + case LTTNG_CONSUMER_DISCARDED_EVENTS: + { + uint64_t ret; + struct lttng_consumer_channel *channel; + uint64_t id = msg.u.discarded_events.session_id; + uint64_t key = msg.u.discarded_events.channel_key; + + DBG("Kernel consumer discarded events command for session id %" + PRIu64 ", channel key %" PRIu64, id, key); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer discarded events channel %" + PRIu64 " not found", key); + ret = 0; + } else { + ret = channel->discarded_events; + } + + health_code_update(); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send discarded events"); + goto error_fatal; + } + + break; + } + case LTTNG_CONSUMER_LOST_PACKETS: + { + uint64_t ret; + struct lttng_consumer_channel *channel; + uint64_t id = msg.u.lost_packets.session_id; + uint64_t key = msg.u.lost_packets.channel_key; + + DBG("Kernel consumer lost packets command for session id %" + PRIu64 ", channel key %" PRIu64, id, key); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer lost packets channel %" + PRIu64 " not found", key); + ret = 0; + } else { + ret = channel->lost_packets; + } + + health_code_update(); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send lost packets"); + goto error_fatal; + } + + break; + } default: goto end_nosignal; } @@ -381,125 +1068,347 @@ end_nosignal: * Return 1 to indicate success since the 0 value can be a socket * shutdown during the recv() or send() call. */ + health_code_update(); return 1; + +error_fatal: + rcu_read_unlock(); + /* This will issue a consumer stop. */ + return -1; } /* - * Consume data on a file descriptor and write it on a trace file. + * Sync metadata meaning request them to the session daemon and snapshot to the + * metadata thread can consumer them. + * + * Metadata stream lock MUST be acquired. + * + * Return 0 if new metadatda is available, EAGAIN if the metadata stream + * is empty or a negative value on error. */ -ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) +int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata) { - unsigned long len, subbuf_size, padding; - int err; - ssize_t ret = 0; - int infd = stream->wait_fd; + int ret; - DBG("In read_subbuffer (infd : %d)", infd); - /* Get the next subbuffer */ - err = kernctl_get_next_subbuf(infd); - if (err != 0) { - ret = err; - /* - * This is a debug message even for single-threaded consumer, - * because poll() have more relaxed criterions than get subbuf, - * so get_subbuf may fail for short race windows where poll() - * would issue wakeups. - */ - DBG("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency)"); + assert(metadata); + + ret = kernctl_buffer_flush(metadata->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); goto end; } - /* Get the full subbuffer size including padding */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - errno = -err; - perror("Getting sub-buffer len failed."); - ret = err; + ret = kernctl_snapshot(metadata->wait_fd); + if (ret < 0) { + if (ret != -EAGAIN) { + ERR("Sync metadata, taking kernel snapshot failed."); + goto end; + } + DBG("Sync metadata, no new kernel metadata"); + /* No new metadata, exit. */ + ret = ENODATA; goto end; } - switch (stream->chan->output) { - case LTTNG_EVENT_SPLICE: +end: + return ret; +} - /* - * XXX: The lttng-modules splice "actor" does not handle copying - * partial pages hence only using the subbuffer size without the - * padding makes the splice fail. - */ - subbuf_size = len; - padding = 0; +static +int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; - /* splice the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, - padding); - /* - * XXX: Splice does not support network streaming so the return value - * is simply checked against subbuf_size and not like the mmap() op. - */ - if (ret != subbuf_size) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile (ret: %zd != len: %lu)", - ret, subbuf_size); + ret = kernctl_get_subbuf_size( + stream->wait_fd, &subbuf->info.data.subbuf_size); + if (ret) { + goto end; + } + + ret = kernctl_get_padded_subbuf_size( + stream->wait_fd, &subbuf->info.data.padded_subbuf_size); + if (ret) { + goto end; + } + +end: + return ret; +} + +static +int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; + + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } + + ret = kernctl_get_metadata_version( + stream->wait_fd, &subbuf->info.metadata.version); + if (ret) { + goto end; + } + +end: + return ret; +} + +static +int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; + + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } + + ret = kernctl_get_packet_size( + stream->wait_fd, &subbuf->info.data.packet_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer packet size"); + goto end; + } + + ret = kernctl_get_content_size( + stream->wait_fd, &subbuf->info.data.content_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer content size"); + goto end; + } + + ret = kernctl_get_timestamp_begin( + stream->wait_fd, &subbuf->info.data.timestamp_begin); + if (ret < 0) { + PERROR("Failed to get sub-buffer begin timestamp"); + goto end; + } + + ret = kernctl_get_timestamp_end( + stream->wait_fd, &subbuf->info.data.timestamp_end); + if (ret < 0) { + PERROR("Failed to get sub-buffer end timestamp"); + goto end; + } + + ret = kernctl_get_events_discarded( + stream->wait_fd, &subbuf->info.data.events_discarded); + if (ret) { + PERROR("Failed to get sub-buffer events discarded count"); + goto end; + } + + ret = kernctl_get_sequence_number(stream->wait_fd, + &subbuf->info.data.sequence_number.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get sub-buffer sequence number"); + goto end; } - break; - case LTTNG_EVENT_MMAP: - /* Get subbuffer size without padding */ - err = kernctl_get_subbuf_size(infd, &subbuf_size); - if (err != 0) { - errno = -err; - perror("Getting sub-buffer len failed."); - ret = err; + } else { + subbuf->info.data.sequence_number.is_set = true; + } + + ret = kernctl_get_stream_id( + stream->wait_fd, &subbuf->info.data.stream_id); + if (ret < 0) { + PERROR("Failed to get stream id"); + goto end; + } + + ret = kernctl_get_instance_id(stream->wait_fd, + &subbuf->info.data.stream_instance_id.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get stream instance id"); goto end; } + } else { + subbuf->info.data.stream_instance_id.is_set = true; + } +end: + return ret; +} - /* Make sure the tracer is not gone mad on us! */ - assert(len >= subbuf_size); +static +int get_subbuffer_common(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; - padding = len - subbuf_size; + ret = kernctl_get_next_subbuf(stream->wait_fd); + if (ret) { + goto end; + } - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, - padding); - /* - * The mmap operation should write subbuf_size amount of data when - * network streaming or the full padding (len) size when we are _not_ - * streaming. - */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); +end: + return ret; +} + +static +int get_next_subbuffer_splice(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + + ret = get_subbuffer_common(stream, subbuffer); + if (ret) { + goto end; + } + + subbuffer->buffer.fd = stream->wait_fd; +end: + return ret; +} + +static +int get_next_subbuffer_mmap(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + + ret = get_subbuffer_common(stream, subbuffer); + if (ret) { + goto end; + } + + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { + goto end; + } + + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); +end: + return ret; +} + +static +int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + bool coherent; + + ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, + &coherent); + if (ret) { + goto end; + } + + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); + if (ret) { + goto end; + } + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); + + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { + goto end; + } + + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); + DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s", + subbuffer->info.metadata.padded_subbuf_size, + coherent ? "true" : "false"); +end: + return ret; +} + +static +int put_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + const int ret = kernctl_put_next_subbuf(stream->wait_fd); + + if (ret) { + if (ret == -EFAULT) { + PERROR("Error in unreserving sub buffer"); + } else if (ret == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted"); + } + } + + return ret; +} + +static +bool is_get_next_check_metadata_available(int tracer_fd) +{ + return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) != + -ENOTTY; +} + +static +int lttng_kconsumer_set_stream_ops( + struct lttng_consumer_stream *stream) +{ + int ret = 0; + + if (stream->metadata_flag && stream->chan->is_live) { + DBG("Attempting to enable metadata bucketization for live consumers"); + if (is_get_next_check_metadata_available(stream->wait_fd)) { + DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached"); + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_metadata_check; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } else { /* - * Display the error but continue processing to try to release the - * subbuffer + * The kernel tracer version is too old to indicate + * when the metadata stream has reached a "coherent" + * (parseable) point. + * + * This means that a live viewer may see an incoherent + * sequence of metadata and fail to parse it. */ - ERR("Error writing to tracefile " - "(ret: %zd != len: %lu != subbuf_size: %lu)", - ret, len, subbuf_size); + WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream"); + metadata_bucket_destroy(stream->metadata_bucket); + stream->metadata_bucket = NULL; } - break; - default: - ERR("Unknown output method"); - ret = -1; } - err = kernctl_put_next_subbuf(infd); - if (err != 0) { - errno = -err; - if (errno == EFAULT) { - perror("Error in unreserving sub buffer\n"); - } else if (errno == EIO) { - /* Should never happen with newer LTTng versions */ - perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + if (!stream->read_subbuffer_ops.get_next_subbuffer) { + if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_mmap; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_splice; } + } - ret = -err; - goto end; + if (stream->metadata_flag) { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_metadata_subbuffer_info; + } else { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_data_subbuffer_info; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.send_live_beacon = + consumer_flush_kernel_index; + } } + stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; end: return ret; } @@ -510,16 +1419,33 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); - /* Don't create anything if this is set for streaming. */ - if (stream->net_seq_idx == (uint64_t) -1ULL) { + /* + * Don't create anything if this is set for streaming or should not be + * monitored. + */ + if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { goto error; } stream->out_fd = ret; stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + struct lttng_index_file *index_file; + + index_file = lttng_index_file_create(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!index_file) { + goto error; + } + stream->index_file = index_file; + } } if (stream->output == LTTNG_EVENT_MMAP) { @@ -528,7 +1454,6 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); if (ret != 0) { - errno = -ret; PERROR("kernctl_get_mmap_len"); goto error_close_fd; } @@ -543,15 +1468,21 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } } + ret = lttng_kconsumer_set_stream_ops(stream); + if (ret) { + goto error_close_fd; + } + /* we return 0 to let the library handle the FD internally */ return 0; error_close_fd: - { + if (stream->out_fd >= 0) { int err; err = close(stream->out_fd); assert(!err); + stream->out_fd = -1; } error: return ret; @@ -571,6 +1502,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; + goto end; + } + ret = kernctl_get_next_subbuf(stream->wait_fd); if (ret == 0) { /* There is still data so let's put back this subbuffer. */