X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=b5afc73aa5fb167071145b35e6d8a16a4291cbc6;hp=a8abcd7901121c62dc27ec94f7d01cee62d2a6fa;hb=f22dd89135f6b1749cd75735f04e3cb309da12c1;hpb=f0b03c2289d0f84c1e2dc41be70cd0bcc222e181 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index a8abcd790..b5afc73aa 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers + * Copyright (C) 2017 - Jérémie Galarneau * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -67,6 +68,19 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) return ret; } +/* + * Sample consumed and produced positions for a specific fd. + * + * Returns 0 on success, < 0 on error. + */ +int lttng_kconsumer_sample_snapshot_positions( + struct lttng_consumer_stream *stream) +{ + assert(stream); + + return kernctl_snapshot_sample_positions(stream->wait_fd); +} + /* * Get the produced position * @@ -182,11 +196,23 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ERR("sending streams sent to relayd"); goto end_unlock; } + channel->streams_sent_to_relayd = true; } - ret = kernctl_buffer_flush(stream->wait_fd); + ret = kernctl_buffer_flush_empty(stream->wait_fd); if (ret < 0) { - ERR("Failed to flush kernel stream"); + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect + * for stream intersection, but required as a + * fall-back when "flush_empty" is not + * implemented by lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end_unlock; + } goto end_unlock; } @@ -533,9 +559,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } - if (CONSUMER_CHANNEL_TYPE_DATA) { + if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) { + int monitor_start_ret; + + DBG("Consumer starting monitor timer"); consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval); + monitor_start_ret = consumer_timer_monitor_start( + new_channel, + msg.u.channel.monitor_timer_interval); + if (monitor_start_ret < 0) { + ERR("Starting channel monitoring timer failed"); + goto end_nosignal; + } + } health_code_update(); @@ -716,6 +753,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_stream_free(new_stream); goto end_nosignal; } + + /* + * If adding an extra stream to an already + * existing channel (e.g. cpu hotplug), we need + * to send the "streams_sent" command to relayd. + */ + if (channel->streams_sent_to_relayd) { + ret = consumer_send_relayd_streams_sent( + new_stream->net_seq_idx); + if (ret < 0) { + goto end_nosignal; + } + } } /* Get the right pipe where the stream will be sent. */ @@ -809,6 +859,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret < 0) { goto end_nosignal; } + channel->streams_sent_to_relayd = true; } break; } @@ -1012,6 +1063,55 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } + case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: + { + int channel_monitor_pipe; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Successfully received the command's type. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + + ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, + 1); + if (ret != sizeof(channel_monitor_pipe)) { + ERR("Failed to receive channel monitor pipe"); + goto error_fatal; + } + + DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); + ret = consumer_timer_thread_set_channel_monitor_pipe( + channel_monitor_pipe); + if (!ret) { + int flags; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Set the pipe as non-blocking. */ + ret = fcntl(channel_monitor_pipe, F_GETFL, 0); + if (ret == -1) { + PERROR("fcntl get flags of the channel monitoring pipe"); + goto error_fatal; + } + flags = ret; + + ret = fcntl(channel_monitor_pipe, F_SETFL, + flags | O_NONBLOCK); + if (ret == -1) { + PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); + goto error_fatal; + } + DBG("Channel monitor pipe set as non-blocking"); + } else { + ret_code = LTTCOMM_CONSUMERD_ALREADY_SET; + } + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + break; + } default: goto end_nosignal; } @@ -1315,12 +1415,34 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } ret = update_stream_stats(stream); if (ret < 0) { + err = kernctl_put_subbuf(infd); + if (err != 0) { + if (err == -EFAULT) { + PERROR("Error in unreserving sub buffer\n"); + } else if (err == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + ret = err; + goto end; + } goto end; } } else { write_index = 0; ret = metadata_stream_check_version(infd, stream); if (ret < 0) { + err = kernctl_put_subbuf(infd); + if (err != 0) { + if (err == -EFAULT) { + PERROR("Error in unreserving sub buffer\n"); + } else if (err == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + ret = err; + goto end; + } goto end; } } @@ -1475,14 +1597,18 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) stream->tracefile_size_current = 0; if (!stream->metadata_flag) { - ret = index_create_file(stream->chan->pathname, + struct lttng_index_file *index_file; + + index_file = lttng_index_file_create(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!index_file) { goto error; } - stream->index_fd = ret; + assert(!stream->index_file); + stream->index_file = index_file; } }