X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer-timer.c;h=e2be05e7e731b8355235e3d040365545c5828bbc;hp=37c9861b906a0a81c6b08a302c73e486e6c6d3dd;hb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883;hpb=7ce3675685dbbc7be9536eb9c2b5ff8d677dc0b5 diff --git a/src/common/consumer-timer.c b/src/common/consumer-timer.c index 37c9861b9..e2be05e7e 100644 --- a/src/common/consumer-timer.c +++ b/src/common/consumer-timer.c @@ -21,7 +21,11 @@ #include #include +#include #include +#include +#include +#include #include "consumer-timer.h" #include "ust-consumer/ust-consumer.h" @@ -46,11 +50,15 @@ static void setmask(sigset_t *mask) } ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH); if (ret) { - PERROR("sigaddset"); + PERROR("sigaddset switch"); } ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN); if (ret) { - PERROR("sigaddset"); + PERROR("sigaddset teardown"); + } + ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE); + if (ret) { + PERROR("sigaddset live"); } } @@ -104,6 +112,165 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, } } +static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts) +{ + int ret; + struct lttng_packet_index index; + + memset(&index, 0, sizeof(index)); + index.timestamp_end = htobe64(ts); + ret = consumer_stream_write_index(stream, &index); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +static int check_kernel_stream(struct lttng_consumer_stream *stream) +{ + uint64_t ts; + int ret; + + /* + * While holding the stream mutex, try to take a snapshot, if it + * succeeds, it means that data is ready to be sent, just let the data + * thread handle that. Otherwise, if the snapshot returns EAGAIN, it + * means that there is no data to read after the flush, so we can + * safely send the empty index. + */ + pthread_mutex_lock(&stream->lock); + ret = kernctl_get_current_timestamp(stream->wait_fd, &ts); + if (ret < 0) { + ERR("Failed to get the current timestamp"); + goto error_unlock; + } + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto error_unlock; + } + ret = kernctl_snapshot(stream->wait_fd); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Taking kernel snapshot"); + ret = -1; + goto error_unlock; + } + DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); + ret = send_empty_index(stream, ts); + if (ret < 0) { + goto error_unlock; + } + } + ret = 0; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + return ret; +} + +static int check_ust_stream(struct lttng_consumer_stream *stream) +{ + uint64_t ts; + int ret; + + assert(stream); + assert(stream->ustream); + /* + * While holding the stream mutex, try to take a snapshot, if it + * succeeds, it means that data is ready to be sent, just let the data + * thread handle that. Otherwise, if the snapshot returns EAGAIN, it + * means that there is no data to read after the flush, so we can + * safely send the empty index. + */ + pthread_mutex_lock(&stream->lock); + ret = ustctl_get_current_timestamp(stream->ustream, &ts); + if (ret < 0) { + ERR("Failed to get the current timestamp"); + goto error_unlock; + } + ustctl_flush_buffer(stream->ustream, 1); + ret = ustctl_snapshot(stream->ustream); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Taking UST snapshot"); + ret = -1; + goto error_unlock; + } + DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); + ret = send_empty_index(stream, ts); + if (ret < 0) { + goto error_unlock; + } + } + ret = 0; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + return ret; +} + +/* + * Execute action on a live timer + */ +static void live_timer(struct lttng_consumer_local_data *ctx, + int sig, siginfo_t *si, void *uc) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht *ht; + struct lttng_ht_iter iter; + + channel = si->si_value.sival_ptr; + assert(channel); + + if (channel->switch_timer_error) { + goto error; + } + ht = consumer_data.stream_per_chan_id_ht; + + DBG("Live timer for channel %" PRIu64, channel->key); + + rcu_read_lock(); + switch (ctx->type) { + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + ret = check_ust_stream(stream); + if (ret < 0) { + goto error_unlock; + } + } + break; + case LTTNG_CONSUMER_KERNEL: + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + ret = check_kernel_stream(stream); + if (ret < 0) { + goto error_unlock; + } + } + break; + case LTTNG_CONSUMER_UNKNOWN: + assert(0); + break; + } + +error_unlock: + rcu_read_unlock(); + +error: + return; +} + static void consumer_timer_signal_thread_qs(unsigned int signr) { @@ -212,6 +379,63 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) channel->switch_timer_enabled = 0; } +/* + * Set the timer for the live mode. + */ +void consumer_timer_live_start(struct lttng_consumer_channel *channel, + int live_timer_interval) +{ + int ret; + struct sigevent sev; + struct itimerspec its; + + assert(channel); + assert(channel->key); + + if (live_timer_interval == 0) { + return; + } + + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE; + sev.sigev_value.sival_ptr = channel; + ret = timer_create(CLOCKID, &sev, &channel->live_timer); + if (ret == -1) { + PERROR("timer_create"); + } + channel->live_timer_enabled = 1; + + its.it_value.tv_sec = live_timer_interval / 1000000; + its.it_value.tv_nsec = live_timer_interval % 1000000; + its.it_interval.tv_sec = its.it_value.tv_sec; + its.it_interval.tv_nsec = its.it_value.tv_nsec; + + ret = timer_settime(channel->live_timer, 0, &its, NULL); + if (ret == -1) { + PERROR("timer_settime"); + } +} + +/* + * Stop and delete timer. + */ +void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +{ + int ret; + + assert(channel); + + ret = timer_delete(channel->live_timer); + if (ret == -1) { + PERROR("timer_delete"); + } + + consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE); + + channel->live_timer = 0; + channel->live_timer_enabled = 0; +} + /* * Block the RT signals for the entire process. It must be called from the * consumer main before creating the threads @@ -231,11 +455,10 @@ void consumer_signal_init(void) } /* - * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH and - * LTTNG_CONSUMER_SIG_TEARDOWN that are emitted by the periodic timer to check - * if new metadata is available. + * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH, + * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE. */ -void *consumer_timer_metadata_thread(void *data) +void *consumer_timer_thread(void *data) { int signr; sigset_t mask; @@ -260,6 +483,8 @@ void *consumer_timer_metadata_thread(void *data) CMM_STORE_SHARED(timer_signal.qs_done, 1); cmm_smp_mb(); DBG("Signal timer metadata thread teardown"); + } else if (signr == LTTNG_CONSUMER_SIG_LIVE) { + live_timer(ctx, info.si_signo, &info, NULL); } else { ERR("Unexpected signal %d\n", info.si_signo); }