X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=b5afc73aa5fb167071145b35e6d8a16a4291cbc6;hp=1a786352d75adbfaedf93783ab4d9b91f559c0d2;hb=f22dd89135f6b1749cd75735f04e3cb309da12c1;hpb=b35308203f7844f48542978fd00aab21f2057c17 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 1a786352d..b5afc73aa 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -68,6 +68,19 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) return ret; } +/* + * Sample consumed and produced positions for a specific fd. + * + * Returns 0 on success, < 0 on error. + */ +int lttng_kconsumer_sample_snapshot_positions( + struct lttng_consumer_stream *stream) +{ + assert(stream); + + return kernctl_snapshot_sample_positions(stream->wait_fd); +} + /* * Get the produced position * @@ -183,11 +196,23 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ERR("sending streams sent to relayd"); goto end_unlock; } + channel->streams_sent_to_relayd = true; } - ret = kernctl_buffer_flush(stream->wait_fd); + ret = kernctl_buffer_flush_empty(stream->wait_fd); if (ret < 0) { - ERR("Failed to flush kernel stream"); + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect + * for stream intersection, but required as a + * fall-back when "flush_empty" is not + * implemented by lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end_unlock; + } goto end_unlock; } @@ -534,9 +559,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } - if (CONSUMER_CHANNEL_TYPE_DATA) { + if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) { + int monitor_start_ret; + + DBG("Consumer starting monitor timer"); consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval); + monitor_start_ret = consumer_timer_monitor_start( + new_channel, + msg.u.channel.monitor_timer_interval); + if (monitor_start_ret < 0) { + ERR("Starting channel monitoring timer failed"); + goto end_nosignal; + } + } health_code_update(); @@ -717,6 +753,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_stream_free(new_stream); goto end_nosignal; } + + /* + * If adding an extra stream to an already + * existing channel (e.g. cpu hotplug), we need + * to send the "streams_sent" command to relayd. + */ + if (channel->streams_sent_to_relayd) { + ret = consumer_send_relayd_streams_sent( + new_stream->net_seq_idx); + if (ret < 0) { + goto end_nosignal; + } + } } /* Get the right pipe where the stream will be sent. */ @@ -810,6 +859,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret < 0) { goto end_nosignal; } + channel->streams_sent_to_relayd = true; } break; }