X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer-timer.c;h=5764b13339cafd0366d68e7120519c4333ef3057;hp=68b5638ddc404e009ba7c5d835b7b2fec782ecc9;hb=ea26306076d26c40ed31e4f9170dc2852bda502f;hpb=59a8d3ecc8cbc7e00354a18b591f1a83304849d7 diff --git a/src/common/consumer-timer.c b/src/common/consumer-timer.c index 68b5638dd..5764b1333 100644 --- a/src/common/consumer-timer.c +++ b/src/common/consumer-timer.c @@ -16,17 +16,20 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include +#include #include +#include #include #include #include #include "consumer-timer.h" +#include "consumer-testpoint.h" #include "ust-consumer/ust-consumer.h" static struct timer_signal_data timer_signal = { @@ -111,12 +114,14 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, } } -static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts) +static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, + uint64_t stream_id) { int ret; - struct lttng_packet_index index; + struct ctf_packet_index index; memset(&index, 0, sizeof(index)); + index.stream_id = htobe64(stream_id); index.timestamp_end = htobe64(ts); ret = consumer_stream_write_index(stream, &index); if (ret < 0) { @@ -127,73 +132,103 @@ error: return ret; } -static int check_kernel_stream(struct lttng_consumer_stream *stream) +int consumer_flush_kernel_index(struct lttng_consumer_stream *stream) { - uint64_t ts; + uint64_t ts, stream_id; int ret; - /* - * While holding the stream mutex, try to take a snapshot, if it - * succeeds, it means that data is ready to be sent, just let the data - * thread handle that. Otherwise, if the snapshot returns EAGAIN, it - * means that there is no data to read after the flush, so we can - * safely send the empty index. - */ - pthread_mutex_lock(&stream->lock); ret = kernctl_get_current_timestamp(stream->wait_fd, &ts); if (ret < 0) { ERR("Failed to get the current timestamp"); - goto error_unlock; + goto end; } ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); - goto error_unlock; + goto end; } ret = kernctl_snapshot(stream->wait_fd); if (ret < 0) { - if (errno != EAGAIN) { - ERR("Taking kernel snapshot"); + if (errno != EAGAIN && errno != ENODATA) { + PERROR("live timer kernel snapshot"); ret = -1; - goto error_unlock; + goto end; + } + ret = kernctl_get_stream_id(stream->wait_fd, &stream_id); + if (ret < 0) { + PERROR("kernctl_get_stream_id"); + goto end; } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); - ret = send_empty_index(stream, ts); + ret = send_empty_index(stream, ts, stream_id); if (ret < 0) { - goto error_unlock; + goto end; } } ret = 0; - -error_unlock: - pthread_mutex_unlock(&stream->lock); +end: return ret; } -static int check_ust_stream(struct lttng_consumer_stream *stream) +static int check_kernel_stream(struct lttng_consumer_stream *stream) { - uint64_t ts; int ret; - assert(stream); - assert(stream->ustream); /* * While holding the stream mutex, try to take a snapshot, if it * succeeds, it means that data is ready to be sent, just let the data * thread handle that. Otherwise, if the snapshot returns EAGAIN, it * means that there is no data to read after the flush, so we can * safely send the empty index. + * + * Doing a trylock and checking if waiting on metadata if + * trylock fails. Bail out of the stream is indeed waiting for + * metadata to be pushed. Busy wait on trylock otherwise. */ - pthread_mutex_lock(&stream->lock); + for (;;) { + ret = pthread_mutex_trylock(&stream->lock); + switch (ret) { + case 0: + break; /* We have the lock. */ + case EBUSY: + pthread_mutex_lock(&stream->metadata_timer_lock); + if (stream->waiting_on_metadata) { + ret = 0; + stream->missed_metadata_flush = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + goto end; /* Bail out. */ + } + pthread_mutex_unlock(&stream->metadata_timer_lock); + /* Try again. */ + caa_cpu_relax(); + continue; + default: + ERR("Unexpected pthread_mutex_trylock error %d", ret); + ret = -1; + goto end; + } + break; + } + ret = consumer_flush_kernel_index(stream); + pthread_mutex_unlock(&stream->lock); +end: + return ret; +} + +int consumer_flush_ust_index(struct lttng_consumer_stream *stream) +{ + uint64_t ts, stream_id; + int ret; + ret = cds_lfht_is_node_deleted(&stream->node.node); if (ret) { - goto error_unlock; + goto end; } ret = lttng_ustconsumer_get_current_timestamp(stream, &ts); if (ret < 0) { ERR("Failed to get the current timestamp"); - goto error_unlock; + goto end; } lttng_ustconsumer_flush_buffer(stream, 1); ret = lttng_ustconsumer_take_snapshot(stream); @@ -201,18 +236,68 @@ static int check_ust_stream(struct lttng_consumer_stream *stream) if (ret != -EAGAIN) { ERR("Taking UST snapshot"); ret = -1; - goto error_unlock; + goto end; + } + ret = lttng_ustconsumer_get_stream_id(stream, &stream_id); + if (ret < 0) { + PERROR("ustctl_get_stream_id"); + goto end; } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); - ret = send_empty_index(stream, ts); + ret = send_empty_index(stream, ts, stream_id); if (ret < 0) { - goto error_unlock; + goto end; } } ret = 0; +end: + return ret; +} -error_unlock: +static int check_ust_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + assert(stream->ustream); + /* + * While holding the stream mutex, try to take a snapshot, if it + * succeeds, it means that data is ready to be sent, just let the data + * thread handle that. Otherwise, if the snapshot returns EAGAIN, it + * means that there is no data to read after the flush, so we can + * safely send the empty index. + * + * Doing a trylock and checking if waiting on metadata if + * trylock fails. Bail out of the stream is indeed waiting for + * metadata to be pushed. Busy wait on trylock otherwise. + */ + for (;;) { + ret = pthread_mutex_trylock(&stream->lock); + switch (ret) { + case 0: + break; /* We have the lock. */ + case EBUSY: + pthread_mutex_lock(&stream->metadata_timer_lock); + if (stream->waiting_on_metadata) { + ret = 0; + stream->missed_metadata_flush = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + goto end; /* Bail out. */ + } + pthread_mutex_unlock(&stream->metadata_timer_lock); + /* Try again. */ + caa_cpu_relax(); + continue; + default: + ERR("Unexpected pthread_mutex_trylock error %d", ret); + ret = -1; + goto end; + } + break; + } + ret = consumer_flush_ust_index(stream); pthread_mutex_unlock(&stream->lock); +end: return ret; } @@ -396,7 +481,7 @@ void consumer_timer_live_start(struct lttng_consumer_channel *channel, assert(channel); assert(channel->key); - if (live_timer_interval == 0) { + if (live_timer_interval <= 0) { return; } @@ -444,7 +529,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel) * Block the RT signals for the entire process. It must be called from the * consumer main before creating the threads */ -void consumer_signal_init(void) +int consumer_signal_init(void) { int ret; sigset_t mask; @@ -455,7 +540,9 @@ void consumer_signal_init(void) if (ret) { errno = ret; PERROR("pthread_sigmask"); + return -1; } + return 0; } /* @@ -469,12 +556,26 @@ void *consumer_timer_thread(void *data) siginfo_t info; struct lttng_consumer_local_data *ctx = data; + rcu_register_thread(); + + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER); + + if (testpoint(consumerd_thread_metadata_timer)) { + goto error_testpoint; + } + + health_code_update(); + /* Only self thread will receive signal mask. */ setmask(&mask); CMM_STORE_SHARED(timer_signal.tid, pthread_self()); while (1) { + health_code_update(); + + health_poll_entry(); signr = sigwaitinfo(&mask, &info); + health_poll_exit(); if (signr == -1) { if (errno != EINTR) { PERROR("sigwaitinfo"); @@ -494,5 +595,13 @@ void *consumer_timer_thread(void *data) } } +error_testpoint: + /* Only reached in testpoint error */ + health_error(); + health_unregister(health_consumerd); + + rcu_unregister_thread(); + + /* Never return */ return NULL; }