X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=e06f3b3e079aae44f494ba6d4409f02120018af4;hp=9742ea0844d79d5b64a1e9888d7b8bb54217c332;hb=6f9449c22eef59294cf1e1dc3610a5cbf14baec0;hpb=d05185faf74eddb5d0425822efc877c6c0ce5f0a diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 9742ea084..e06f3b3e0 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1,20 +1,10 @@ /* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * Copyright (C) 2017 - Jérémie Galarneau + * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 Mathieu Desnoyers + * Copyright (C) 2017 Jérémie Galarneau * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License, version 2 only, - * as published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _LGPL_SOURCE @@ -44,6 +34,9 @@ #include #include #include +#include +#include +#include #include "kernel-consumer.h" @@ -123,6 +116,25 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, return ret; } +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base = stream->mmap_base; + + ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret < 0) { + PERROR("Failed to get mmap read offset"); + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; +} + /* * Take a snapshot of all the stream of a channel * RCU read-side lock must be held across this function to ensure existence of @@ -238,9 +250,10 @@ static int lttng_kconsumer_snapshot_channel( while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; unsigned long len, padded_len; + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; health_code_update(); - DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); @@ -267,8 +280,16 @@ static int lttng_kconsumer_snapshot_channel( goto error_put_subbuf; } - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, - padded_len - len, NULL); + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + stream, &subbuf_view, + padded_len - len); /* * We write the padded len in local tracefiles but the data len * when using a relay. Display the error but continue processing @@ -379,7 +400,7 @@ static int lttng_kconsumer_snapshot_metadata( do { health_code_update(); - ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret_read < 0) { if (ret_read != -EAGAIN) { ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", @@ -493,6 +514,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.tracefile_count, 0, msg.u.channel.monitor, msg.u.channel.live_timer_interval, + msg.u.channel.is_live, NULL, NULL); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); @@ -591,14 +613,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto error_fatal; + goto error_add_stream_fatal; } health_code_update(); if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) { /* Channel was not found. */ - goto end_nosignal; + goto error_add_stream_nosignal; } /* Blocking call */ @@ -606,7 +628,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); if (ret) { - goto error_fatal; + goto error_add_stream_fatal; } health_code_update(); @@ -615,8 +637,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); - rcu_read_unlock(); - return ret; + goto end; } health_code_update(); @@ -629,13 +650,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_add_stream_nosignal; } health_code_update(); pthread_mutex_lock(&channel->lock); - new_stream = consumer_allocate_stream(channel->key, + new_stream = consumer_stream_create( + channel, + channel->key, fd, channel->name, channel->relayd_id, @@ -654,38 +677,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } pthread_mutex_unlock(&channel->lock); - goto end_nosignal; + goto error_add_stream_nosignal; } - new_stream->chan = channel; new_stream->wait_fd = fd; ret = kernctl_get_max_subbuf_size(new_stream->wait_fd, &new_stream->max_sb_size); if (ret < 0) { pthread_mutex_unlock(&channel->lock); ERR("Failed to get kernel maximal subbuffer size"); - goto end_nosignal; + goto error_add_stream_nosignal; } consumer_stream_update_channel_attributes(new_stream, channel); - switch (channel->output) { - case CONSUMER_CHANNEL_SPLICE: - new_stream->output = LTTNG_EVENT_SPLICE; - ret = utils_create_pipe(new_stream->splice_pipe); - if (ret < 0) { - pthread_mutex_unlock(&channel->lock); - goto end_nosignal; - } - break; - case CONSUMER_CHANNEL_MMAP: - new_stream->output = LTTNG_EVENT_MMAP; - break; - default: - ERR("Stream output unknown %d", channel->output); - pthread_mutex_unlock(&channel->lock); - goto end_nosignal; - } /* * We've just assigned the channel to the stream so increment the @@ -716,7 +721,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); consumer_stream_free(new_stream); - goto end_nosignal; + goto error_add_stream_nosignal; } } health_code_update(); @@ -733,7 +738,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, cds_list_add(&new_stream->send_node, &channel->streams.head); pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); - break; + goto end_add_stream; } /* Send stream to relayd if the stream has an ID. */ @@ -744,7 +749,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); consumer_stream_free(new_stream); - goto end_nosignal; + goto error_add_stream_nosignal; } /* @@ -758,7 +763,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret < 0) { pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); - goto end_nosignal; + goto error_add_stream_nosignal; } } } @@ -789,12 +794,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { consumer_del_stream_for_data(new_stream); } - goto end_nosignal; + goto error_add_stream_nosignal; } DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64, new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id); +end_add_stream: break; +error_add_stream_nosignal: + goto end_nosignal; +error_add_stream_fatal: + goto error_fatal; } case LTTNG_CONSUMER_STREAMS_SENT: { @@ -823,7 +833,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_streams_sent_nosignal; } health_code_update(); @@ -833,7 +843,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * streams in this channel. */ if (!channel->monitor) { - break; + goto end_error_streams_sent; } health_code_update(); @@ -842,11 +852,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_relayd_streams_sent( msg.u.sent_streams.net_seq_idx); if (ret < 0) { - goto end_nosignal; + goto error_streams_sent_nosignal; } channel->streams_sent_to_relayd = true; } +end_error_streams_sent: break; +error_streams_sent_nosignal: + goto end_nosignal; } case LTTNG_CONSUMER_UPDATE_STREAM: { @@ -972,14 +985,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto end_destroy_channel; } health_code_update(); /* Stop right now if no channel was found. */ if (!channel) { - goto end_nosignal; + goto end_destroy_channel; } /* @@ -995,7 +1008,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, assert(!uatomic_sub_return(&channel->refcount, 1)); consumer_del_channel(channel); - +end_destroy_channel: goto end_nosignal; } case LTTNG_CONSUMER_DISCARDED_EVENTS: @@ -1138,7 +1151,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_rotate_channel; } if (channel) { /* Rotate the streams that are ready right now. */ @@ -1148,6 +1161,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ERR("Rotate ready streams failed"); } } + break; +error_rotate_channel: + goto end_nosignal; + } + case LTTNG_CONSUMER_CLEAR_CHANNEL: + { + struct lttng_consumer_channel *channel; + uint64_t key = msg.u.clear_channel.key; + + channel = consumer_find_channel(key); + if (!channel) { + DBG("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } else { + ret = lttng_consumer_clear_channel(channel); + if (ret) { + ERR("Clear channel failed"); + ret_code = ret; + } + + health_code_update(); + } + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } break; } @@ -1177,8 +1217,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, *msg.u.create_trace_chunk.override_name ? msg.u.create_trace_chunk.override_name : NULL; - LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle = - LTTNG_OPTIONAL_INIT; + struct lttng_directory_handle *chunk_directory_handle = NULL; /* * The session daemon will only provide a chunk directory file @@ -1203,17 +1242,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("Received trace chunk directory fd (%d)", chunk_dirfd); - ret = lttng_directory_handle_init_from_dirfd( - &chunk_directory_handle.value, + chunk_directory_handle = lttng_directory_handle_create_from_dirfd( chunk_dirfd); - if (ret) { + if (!chunk_directory_handle) { ERR("Failed to initialize chunk directory handle from directory file descriptor"); if (close(chunk_dirfd)) { PERROR("Failed to close chunk directory file descriptor"); } goto error_fatal; } - chunk_directory_handle.is_set = true; } ret_code = lttng_consumer_create_trace_chunk( @@ -1226,14 +1263,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL, - chunk_directory_handle.is_set ? - &chunk_directory_handle.value : - NULL); - - if (chunk_directory_handle.is_set) { - lttng_directory_handle_fini( - &chunk_directory_handle.value); - } + chunk_directory_handle); + lttng_directory_handle_put(chunk_directory_handle); goto end_msg_sessiond; } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: @@ -1242,6 +1273,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; + struct lttcomm_consumer_close_trace_chunk_reply reply; + char path[LTTNG_PATH_MAX]; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? @@ -1252,8 +1285,18 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, (time_t) msg.u.close_trace_chunk.close_timestamp, msg.u.close_trace_chunk.close_command.is_set ? &close_command : - NULL); - goto end_msg_sessiond; + NULL, path); + reply.ret_code = ret_code; + reply.path_length = strlen(path) + 1; + ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); + if (ret != sizeof(reply)) { + goto error_fatal; + } + ret = lttcomm_send_unix_sock(sock, path, reply.path_length); + if (ret != reply.path_length) { + goto error_fatal; + } + goto end_nosignal; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { @@ -1272,15 +1315,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } end_nosignal: - rcu_read_unlock(); - /* * Return 1 to indicate success since the 0 value can be a socket * shutdown during the recv() or send() call. */ - health_code_update(); - return 1; - + ret = 1; + goto end; +error_fatal: + /* This will issue a consumer stop. */ + ret = -1; + goto end; end_msg_sessiond: /* * The returned value here is not useful since either way we'll return 1 to @@ -1291,97 +1335,13 @@ end_msg_sessiond: if (ret < 0) { goto error_fatal; } - rcu_read_unlock(); - + ret = 1; +end: health_code_update(); - - return 1; - -error_fatal: rcu_read_unlock(); - /* This will issue a consumer stop. */ - return -1; -} - -/* - * Populate index values of a kernel stream. Values are set in big endian order. - * - * Return 0 on success or else a negative value. - */ -static int get_index_values(struct ctf_packet_index *index, int infd) -{ - int ret; - - ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin); - if (ret < 0) { - PERROR("kernctl_get_timestamp_begin"); - goto error; - } - index->timestamp_begin = htobe64(index->timestamp_begin); - - ret = kernctl_get_timestamp_end(infd, &index->timestamp_end); - if (ret < 0) { - PERROR("kernctl_get_timestamp_end"); - goto error; - } - index->timestamp_end = htobe64(index->timestamp_end); - - ret = kernctl_get_events_discarded(infd, &index->events_discarded); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); - goto error; - } - index->events_discarded = htobe64(index->events_discarded); - - ret = kernctl_get_content_size(infd, &index->content_size); - if (ret < 0) { - PERROR("kernctl_get_content_size"); - goto error; - } - index->content_size = htobe64(index->content_size); - - ret = kernctl_get_packet_size(infd, &index->packet_size); - if (ret < 0) { - PERROR("kernctl_get_packet_size"); - goto error; - } - index->packet_size = htobe64(index->packet_size); - - ret = kernctl_get_stream_id(infd, &index->stream_id); - if (ret < 0) { - PERROR("kernctl_get_stream_id"); - goto error; - } - index->stream_id = htobe64(index->stream_id); - - ret = kernctl_get_instance_id(infd, &index->stream_instance_id); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - index->stream_instance_id = -1ULL; - } else { - PERROR("kernctl_get_instance_id"); - goto error; - } - } - index->stream_instance_id = htobe64(index->stream_instance_id); - - ret = kernctl_get_sequence_number(infd, &index->packet_seq_num); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - index->packet_seq_num = -1ULL; - ret = 0; - } else { - PERROR("kernctl_get_sequence_number"); - goto error; - } - } - index->packet_seq_num = htobe64(index->packet_seq_num); - -error: return ret; } + /* * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. @@ -1420,363 +1380,226 @@ end: } static -int update_stream_stats(struct lttng_consumer_stream *stream) +int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; - uint64_t seq, discarded; - - ret = kernctl_get_sequence_number(stream->wait_fd, &seq); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - seq = -1ULL; - } else { - PERROR("kernctl_get_sequence_number"); - goto end; - } - } - /* - * Start the sequence when we extract the first packet in case we don't - * start at 0 (for example if a consumer is not connected to the - * session immediately after the beginning). - */ - if (stream->last_sequence_number == -1ULL) { - stream->last_sequence_number = seq; - } else if (seq > stream->last_sequence_number) { - stream->chan->lost_packets += seq - - stream->last_sequence_number - 1; - } else { - /* seq <= last_sequence_number */ - ERR("Sequence number inconsistent : prev = %" PRIu64 - ", current = %" PRIu64, - stream->last_sequence_number, seq); - ret = -1; + ret = kernctl_get_subbuf_size( + stream->wait_fd, &subbuf->info.data.subbuf_size); + if (ret) { goto end; } - stream->last_sequence_number = seq; - ret = kernctl_get_events_discarded(stream->wait_fd, &discarded); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); + ret = kernctl_get_padded_subbuf_size( + stream->wait_fd, &subbuf->info.data.padded_subbuf_size); + if (ret) { goto end; } - if (discarded < stream->last_discarded_events) { - /* - * Overflow has occurred. We assume only one wrap-around - * has occurred. - */ - stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) - - stream->last_discarded_events + discarded; - } else { - stream->chan->discarded_events += discarded - - stream->last_discarded_events; - } - stream->last_discarded_events = discarded; - ret = 0; end: return ret; } -/* - * Check if the local version of the metadata stream matches with the version - * of the metadata stream in the kernel. If it was updated, set the reset flag - * on the stream. - */ static -int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream) +int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; - uint64_t cur_version; - ret = kernctl_get_metadata_version(infd, &cur_version); - if (ret < 0) { - if (ret == -ENOTTY) { - /* - * LTTng-modules does not implement this - * command. - */ - ret = 0; - goto end; - } - ERR("Failed to get the metadata version"); + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { goto end; } - if (stream->metadata_version == cur_version) { - ret = 0; + ret = kernctl_get_metadata_version( + stream->wait_fd, &subbuf->info.metadata.version); + if (ret) { goto end; } - DBG("New metadata version detected"); - stream->metadata_version = cur_version; - stream->reset_metadata_flag = 1; - ret = 0; - end: return ret; } -/* - * Consume data on a file descriptor and write it on a trace file. - * The stream and channel locks must be held by the caller. - */ -ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) +static +int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { - unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; - ssize_t ret = 0; - int infd = stream->wait_fd; - struct ctf_packet_index index; + int ret; - DBG("In read_subbuffer (infd : %d)", infd); + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } - /* - * If the stream was flagged to be ready for rotation before we extract the - * next packet, rotate it now. - */ - if (stream->rotate_ready) { - DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } + ret = kernctl_get_packet_size( + stream->wait_fd, &subbuf->info.data.packet_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer packet size"); + goto end; } - /* Get the next subbuffer */ - err = kernctl_get_next_subbuf(infd); - if (err != 0) { - /* - * This is a debug message even for single-threaded consumer, - * because poll() have more relaxed criterions than get subbuf, - * so get_subbuf may fail for short race windows where poll() - * would issue wakeups. - */ - DBG("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency)"); - ret = err; - goto error; + ret = kernctl_get_content_size( + stream->wait_fd, &subbuf->info.data.content_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer content size"); + goto end; } - /* Get the full subbuffer size including padding */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - PERROR("Getting sub-buffer len failed."); - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - ret = err; - goto error; + ret = kernctl_get_timestamp_begin( + stream->wait_fd, &subbuf->info.data.timestamp_begin); + if (ret < 0) { + PERROR("Failed to get sub-buffer begin timestamp"); + goto end; } - if (!stream->metadata_flag) { - ret = get_index_values(&index, infd); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - goto error; - } - ret = update_stream_stats(stream); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - goto error; + ret = kernctl_get_timestamp_end( + stream->wait_fd, &subbuf->info.data.timestamp_end); + if (ret < 0) { + PERROR("Failed to get sub-buffer end timestamp"); + goto end; + } + + ret = kernctl_get_events_discarded( + stream->wait_fd, &subbuf->info.data.events_discarded); + if (ret) { + PERROR("Failed to get sub-buffer events discarded count"); + goto end; + } + + ret = kernctl_get_sequence_number(stream->wait_fd, + &subbuf->info.data.sequence_number.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get sub-buffer sequence number"); + goto end; } } else { - write_index = 0; - ret = metadata_stream_check_version(infd, stream); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - goto error; - } + subbuf->info.data.sequence_number.is_set = true; } - switch (stream->chan->output) { - case CONSUMER_CHANNEL_SPLICE: - /* - * XXX: The lttng-modules splice "actor" does not handle copying - * partial pages hence only using the subbuffer size without the - * padding makes the splice fail. - */ - subbuf_size = len; - padding = 0; + ret = kernctl_get_stream_id( + stream->wait_fd, &subbuf->info.data.stream_id); + if (ret < 0) { + PERROR("Failed to get stream id"); + goto end; + } - /* splice the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, - padding, &index); - /* - * XXX: Splice does not support network streaming so the return value - * is simply checked against subbuf_size and not like the mmap() op. - */ - if (ret != subbuf_size) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile (ret: %zd != len: %lu)", - ret, subbuf_size); - write_index = 0; - } - break; - case CONSUMER_CHANNEL_MMAP: - /* Get subbuffer size without padding */ - err = kernctl_get_subbuf_size(infd, &subbuf_size); - if (err != 0) { - PERROR("Getting sub-buffer len failed."); - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - ret = err; - goto error; + ret = kernctl_get_instance_id(stream->wait_fd, + &subbuf->info.data.stream_instance_id.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get stream instance id"); + goto end; } + } else { + subbuf->info.data.stream_instance_id.is_set = true; + } +end: + return ret; +} - /* Make sure the tracer is not gone mad on us! */ - assert(len >= subbuf_size); +static +int get_subbuffer_common(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; - padding = len - subbuf_size; + ret = kernctl_get_next_subbuf(stream->wait_fd); + if (ret) { + goto end; + } - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, - padding, &index); - /* - * The mmap operation should write subbuf_size amount of data when - * network streaming or the full padding (len) size when we are _not_ - * streaming. - */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { - /* - * Display the error but continue processing to try to release the - * subbuffer. This is a DBG statement since this is possible to - * happen without being a critical error. - */ - DBG("Error writing to tracefile " - "(ret: %zd != len: %lu != subbuf_size: %lu)", - ret, len, subbuf_size); - write_index = 0; - } - break; - default: - ERR("Unknown output method"); - ret = -EPERM; + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); +end: + return ret; +} + +static +int get_next_subbuffer_splice(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + + ret = get_subbuffer_common(stream, subbuffer); + if (ret) { + goto end; } - err = kernctl_put_next_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; + subbuffer->buffer.fd = stream->wait_fd; +end: + return ret; +} + +static +int get_next_subbuffer_mmap(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + + ret = get_subbuffer_common(stream, subbuffer); + if (ret) { + goto end; } - /* Write index if needed. */ - if (!write_index) { - goto rotate; + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { + goto end; } - if (stream->chan->live_timer_interval && !stream->metadata_flag) { - /* - * In live, block until all the metadata is sent. - */ - pthread_mutex_lock(&stream->metadata_timer_lock); - assert(!stream->missed_metadata_flush); - stream->waiting_on_metadata = true; - pthread_mutex_unlock(&stream->metadata_timer_lock); - - err = consumer_stream_sync_metadata(ctx, stream->session_id); - - pthread_mutex_lock(&stream->metadata_timer_lock); - stream->waiting_on_metadata = false; - if (stream->missed_metadata_flush) { - stream->missed_metadata_flush = false; - pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) consumer_flush_kernel_index(stream); - } else { - pthread_mutex_unlock(&stream->metadata_timer_lock); - } - if (err < 0) { - goto error; + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); +end: + return ret; +} + +static +int put_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + const int ret = kernctl_put_next_subbuf(stream->wait_fd); + + if (ret) { + if (ret == -EFAULT) { + PERROR("Error in unreserving sub buffer"); + } else if (ret == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted"); } } - err = consumer_stream_write_index(stream, &index); - if (err < 0) { - goto error; + return ret; +} + +static void lttng_kconsumer_set_stream_ops( + struct lttng_consumer_stream *stream) +{ + if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_mmap; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_splice; } -rotate: - /* - * After extracting the packet, we check if the stream is now ready to be - * rotated and perform the action immediately. - */ - rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); - if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; + if (stream->metadata_flag) { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_metadata_subbuffer_info; + } else { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_data_subbuffer_info; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.send_live_beacon = + consumer_flush_kernel_index; } - } else if (rotation_ret < 0) { - ERR("Checking if stream is ready to rotate"); - ret = -1; - goto error; } -error: - return ret; + stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; } int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) @@ -1817,6 +1640,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } } + lttng_kconsumer_set_stream_ops(stream); + /* we return 0 to let the library handle the FD internally */ return 0;