if (ret) {
PERROR("sigaddset monitor");
}
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
+ if (ret) {
+ PERROR("sigaddset exit");
+ }
}
static int channel_monitor_pipe = -1;
if (ret == -1) {
PERROR("sigpending");
}
- if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
+ if (!sigismember(&pending_set, signr)) {
break;
}
caa_cpu_relax();
static
int sample_channel_positions(struct lttng_consumer_channel *channel,
- uint64_t *_highest_use, uint64_t *_lowest_use,
+ uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
sample_positions_cb sample, get_consumed_cb get_consumed,
get_produced_cb get_produced)
{
- int ret;
+ int ret = 0;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
bool empty_channel = true;
uint64_t high = 0, low = UINT64_MAX;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ *_total_consumed = 0;
+
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
usage = produced - consumed;
high = (usage > high) ? usage : high;
low = (usage < low) ? usage : low;
+
+ /*
+ * We don't use consumed here for 2 reasons:
+ * - output_written takes into account the padding written in the
+ * tracefiles when we stop the session;
+ * - the consumed position is not the accurate representation of what
+ * was extracted from a buffer in overwrite mode.
+ */
+ *_total_consumed += stream->output_written;
next:
pthread_mutex_unlock(&stream->lock);
}
}
ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
- sample, get_consumed, get_produced);
+ &msg.total_consumed, sample, get_consumed, get_produced);
if (ret) {
return;
}
/*
* This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
* LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
- * LTTNG_CONSUMER_SIG_MONITOR.
+ * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
*/
void *consumer_timer_thread(void *data)
{
channel = info.si_value.sival_ptr;
monitor_timer(ctx, channel);
+ } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
+ assert(CMM_LOAD_SHARED(consumer_quit));
+ goto end;
} else {
ERR("Unexpected signal %d\n", info.si_signo);
}
error_testpoint:
/* Only reached in testpoint error */
health_error();
+end:
health_unregister(health_consumerd);
-
rcu_unregister_thread();
-
- /* Never return */
return NULL;
}