#include <inttypes.h>
#include <signal.h>
+#include <bin/lttng-sessiond/ust-ctl.h>
#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/compat/endian.h>
#include <common/consumer/consumer-testpoint.h>
#include <common/ust-consumer/ust-consumer.h>
+typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
+typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
+ unsigned long *consumed);
+typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
+ unsigned long *produced);
+
static struct timer_signal_data timer_signal = {
.tid = 0,
.setup_done = 0,
if (ret) {
PERROR("sigaddset live");
}
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
+ if (ret) {
+ PERROR("sigaddset monitor");
+ }
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
+ if (ret) {
+ PERROR("sigaddset exit");
+ }
}
+static int channel_monitor_pipe = -1;
+
/*
* Execute action on a timer switch.
*
* deadlocks.
*/
static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
- int sig, siginfo_t *si, void *uc)
+ siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
}
ret = kernctl_snapshot(stream->wait_fd);
if (ret < 0) {
- if (errno != EAGAIN && errno != ENODATA) {
+ if (ret != -EAGAIN && ret != -ENODATA) {
PERROR("live timer kernel snapshot");
ret = -1;
goto end;
* Execute action on a live timer
*/
static void live_timer(struct lttng_consumer_local_data *ctx,
- int sig, siginfo_t *si, void *uc)
+ siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
if (ret == -1) {
PERROR("sigpending");
}
- if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
+ if (!sigismember(&pending_set, signr)) {
break;
}
caa_cpu_relax();
}
/*
- * Set the timer for periodical metadata flush.
+ * Start a timer channel timer which will fire at a given interval
+ * (timer_interval_us)and fire a given signal (signal).
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
*/
-void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
- unsigned int switch_timer_interval)
+static
+int consumer_channel_timer_start(timer_t *timer_id,
+ struct lttng_consumer_channel *channel,
+ unsigned int timer_interval_us, int signal)
{
- int ret;
+ int ret = 0, delete_ret;
struct sigevent sev;
struct itimerspec its;
assert(channel);
assert(channel->key);
- if (switch_timer_interval == 0) {
- return;
+ if (timer_interval_us == 0) {
+ /* No creation needed; not an error. */
+ ret = 1;
+ goto end;
}
sev.sigev_notify = SIGEV_SIGNAL;
- sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
+ sev.sigev_signo = signal;
sev.sigev_value.sival_ptr = channel;
- ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
+ ret = timer_create(CLOCKID, &sev, timer_id);
if (ret == -1) {
PERROR("timer_create");
+ goto end;
}
- channel->switch_timer_enabled = 1;
- its.it_value.tv_sec = switch_timer_interval / 1000000;
- its.it_value.tv_nsec = switch_timer_interval % 1000000;
+ its.it_value.tv_sec = timer_interval_us / 1000000;
+ its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
its.it_interval.tv_sec = its.it_value.tv_sec;
its.it_interval.tv_nsec = its.it_value.tv_nsec;
- ret = timer_settime(channel->switch_timer, 0, &its, NULL);
+ ret = timer_settime(*timer_id, 0, &its, NULL);
if (ret == -1) {
PERROR("timer_settime");
+ goto error_destroy_timer;
+ }
+end:
+ return ret;
+error_destroy_timer:
+ delete_ret = timer_delete(*timer_id);
+ if (delete_ret == -1) {
+ PERROR("timer_delete");
+ }
+ goto end;
+}
+
+static
+int consumer_channel_timer_stop(timer_t *timer_id, int signal)
+{
+ int ret = 0;
+
+ ret = timer_delete(*timer_id);
+ if (ret == -1) {
+ PERROR("timer_delete");
+ goto end;
}
+
+ consumer_timer_signal_thread_qs(signal);
+ *timer_id = 0;
+end:
+ return ret;
}
/*
- * Stop and delete timer.
+ * Set the channel's switch timer.
+ */
+void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
+ unsigned int switch_timer_interval_us)
+{
+ int ret;
+
+ assert(channel);
+ assert(channel->key);
+
+ ret = consumer_channel_timer_start(&channel->switch_timer, channel,
+ switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
+
+ channel->switch_timer_enabled = !!(ret == 0);
+}
+
+/*
+ * Stop and delete the channel's switch timer.
*/
void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
{
assert(channel);
- ret = timer_delete(channel->switch_timer);
+ ret = consumer_channel_timer_stop(&channel->switch_timer,
+ LTTNG_CONSUMER_SIG_SWITCH);
if (ret == -1) {
- PERROR("timer_delete");
+ ERR("Failed to stop switch timer");
}
- consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
-
- channel->switch_timer = 0;
channel->switch_timer_enabled = 0;
}
/*
- * Set the timer for the live mode.
+ * Set the channel's live timer.
*/
void consumer_timer_live_start(struct lttng_consumer_channel *channel,
- int live_timer_interval)
+ unsigned int live_timer_interval_us)
{
int ret;
- struct sigevent sev;
- struct itimerspec its;
assert(channel);
assert(channel->key);
- if (live_timer_interval <= 0) {
- return;
- }
+ ret = consumer_channel_timer_start(&channel->live_timer, channel,
+ live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
- sev.sigev_notify = SIGEV_SIGNAL;
- sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
- sev.sigev_value.sival_ptr = channel;
- ret = timer_create(CLOCKID, &sev, &channel->live_timer);
- if (ret == -1) {
- PERROR("timer_create");
- }
- channel->live_timer_enabled = 1;
+ channel->live_timer_enabled = !!(ret == 0);
+}
- its.it_value.tv_sec = live_timer_interval / 1000000;
- its.it_value.tv_nsec = live_timer_interval % 1000000;
- its.it_interval.tv_sec = its.it_value.tv_sec;
- its.it_interval.tv_nsec = its.it_value.tv_nsec;
+/*
+ * Stop and delete the channel's live timer.
+ */
+void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+{
+ int ret;
- ret = timer_settime(channel->live_timer, 0, &its, NULL);
+ assert(channel);
+
+ ret = consumer_channel_timer_stop(&channel->live_timer,
+ LTTNG_CONSUMER_SIG_LIVE);
if (ret == -1) {
- PERROR("timer_settime");
+ ERR("Failed to stop live timer");
}
+
+ channel->live_timer_enabled = 0;
}
/*
- * Stop and delete timer.
+ * Set the channel's monitoring timer.
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
*/
-void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
+ unsigned int monitor_timer_interval_us)
+{
+ int ret;
+
+ assert(channel);
+ assert(channel->key);
+ assert(!channel->monitor_timer_enabled);
+
+ ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
+ monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
+ channel->monitor_timer_enabled = !!(ret == 0);
+ return ret;
+}
+
+/*
+ * Stop and delete the channel's monitoring timer.
+ */
+int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
{
int ret;
assert(channel);
+ assert(channel->monitor_timer_enabled);
- ret = timer_delete(channel->live_timer);
+ ret = consumer_channel_timer_stop(&channel->monitor_timer,
+ LTTNG_CONSUMER_SIG_MONITOR);
if (ret == -1) {
- PERROR("timer_delete");
+ ERR("Failed to stop live timer");
+ goto end;
}
- consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
-
- channel->live_timer = 0;
- channel->live_timer_enabled = 0;
+ channel->monitor_timer_enabled = 0;
+end:
+ return ret;
}
/*
return 0;
}
+static
+int sample_channel_positions(struct lttng_consumer_channel *channel,
+ uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
+ sample_positions_cb sample, get_consumed_cb get_consumed,
+ get_produced_cb get_produced)
+{
+ int ret = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+ bool empty_channel = true;
+ uint64_t high = 0, low = UINT64_MAX;
+ struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+ *_total_consumed = 0;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key,
+ &iter.iter, stream, node_channel_id.node) {
+ unsigned long produced, consumed, usage;
+
+ empty_channel = false;
+
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
+ ret = sample(stream);
+ if (ret) {
+ ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+ ret = get_consumed(stream, &consumed);
+ if (ret) {
+ ERR("Failed to get buffer consumed position in monitor timer");
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+ ret = get_produced(stream, &produced);
+ if (ret) {
+ ERR("Failed to get buffer produced position in monitor timer");
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+
+ usage = produced - consumed;
+ high = (usage > high) ? usage : high;
+ low = (usage < low) ? usage : low;
+
+ /*
+ * We don't use consumed here for 2 reasons:
+ * - output_written takes into account the padding written in the
+ * tracefiles when we stop the session;
+ * - the consumed position is not the accurate representation of what
+ * was extracted from a buffer in overwrite mode.
+ */
+ *_total_consumed += stream->output_written;
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ *_highest_use = high;
+ *_lowest_use = low;
+end:
+ rcu_read_unlock();
+ if (empty_channel) {
+ ret = -1;
+ }
+ return ret;
+}
+
+/*
+ * Execute action on a monitor timer.
+ */
+static
+void monitor_timer(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ int channel_monitor_pipe =
+ consumer_timer_thread_get_channel_monitor_pipe();
+ struct lttcomm_consumer_channel_monitor_msg msg = {
+ .key = channel->key,
+ };
+ sample_positions_cb sample;
+ get_consumed_cb get_consumed;
+ get_produced_cb get_produced;
+
+ assert(channel);
+
+ if (channel_monitor_pipe < 0) {
+ return;
+ }
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ sample = lttng_kconsumer_sample_snapshot_positions;
+ get_consumed = lttng_kconsumer_get_consumed_snapshot;
+ get_produced = lttng_kconsumer_get_produced_snapshot;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ sample = lttng_ustconsumer_sample_snapshot_positions;
+ get_consumed = lttng_ustconsumer_get_consumed_snapshot;
+ get_produced = lttng_ustconsumer_get_produced_snapshot;
+ break;
+ default:
+ abort();
+ }
+
+ ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
+ &msg.total_consumed, sample, get_consumed, get_produced);
+ if (ret) {
+ return;
+ }
+
+ /*
+ * Writes performed here are assumed to be atomic which is only
+ * guaranteed for sizes < than PIPE_BUF.
+ */
+ assert(sizeof(msg) <= PIPE_BUF);
+
+ do {
+ ret = write(channel_monitor_pipe, &msg, sizeof(msg));
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1) {
+ if (errno == EAGAIN) {
+ /* Not an error, the sample is merely dropped. */
+ DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
+ channel->key);
+ } else {
+ PERROR("write to the channel monitor pipe");
+ }
+ } else {
+ DBG("Sent channel monitoring sample for channel key %" PRIu64
+ ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
+ channel->key, msg.highest, msg.lowest);
+ }
+}
+
+int consumer_timer_thread_get_channel_monitor_pipe(void)
+{
+ return uatomic_read(&channel_monitor_pipe);
+}
+
+int consumer_timer_thread_set_channel_monitor_pipe(int fd)
+{
+ int ret;
+
+ ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
+ if (ret != -1) {
+ ret = -1;
+ goto end;
+ }
+ ret = 0;
+end:
+ return ret;
+}
+
/*
* This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
- * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
+ * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
+ * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
*/
void *consumer_timer_thread(void *data)
{
health_poll_entry();
signr = sigwaitinfo(&mask, &info);
health_poll_exit();
+
+ /*
+ * NOTE: cascading conditions are used instead of a switch case
+ * since the use of SIGRTMIN in the definition of the signals'
+ * values prevents the reduction to an integer constant.
+ */
if (signr == -1) {
if (errno != EINTR) {
PERROR("sigwaitinfo");
}
continue;
} else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
- metadata_switch_timer(ctx, info.si_signo, &info, NULL);
+ metadata_switch_timer(ctx, &info);
} else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
cmm_smp_mb();
CMM_STORE_SHARED(timer_signal.qs_done, 1);
cmm_smp_mb();
DBG("Signal timer metadata thread teardown");
} else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
- live_timer(ctx, info.si_signo, &info, NULL);
+ live_timer(ctx, &info);
+ } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
+ struct lttng_consumer_channel *channel;
+
+ channel = info.si_value.sival_ptr;
+ monitor_timer(channel);
+ } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
+ assert(CMM_LOAD_SHARED(consumer_quit));
+ goto end;
} else {
ERR("Unexpected signal %d\n", info.si_signo);
}
error_testpoint:
/* Only reached in testpoint error */
health_error();
+end:
health_unregister(health_consumerd);
-
rcu_unregister_thread();
-
- /* Never return */
return NULL;
}