Fix: There is more tests than the plan
[lttng-tools.git] / src / common / consumer-timer.c
index 7ece0f6a54a37eb97084e8973cecc63c59b8b8e3..5764b13339cafd0366d68e7120519c4333ef3057 100644 (file)
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <assert.h>
 #include <inttypes.h>
 #include <signal.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
+#include <common/compat/endian.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/consumer-stream.h>
 
 #include "consumer-timer.h"
+#include "consumer-testpoint.h"
 #include "ust-consumer/ust-consumer.h"
-#include "../bin/lttng-consumerd/health-consumerd.h"
 
 static struct timer_signal_data timer_signal = {
        .tid = 0,
@@ -112,12 +114,14 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
        }
 }
 
-static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
+static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
+               uint64_t stream_id)
 {
        int ret;
-       struct lttng_packet_index index;
+       struct ctf_packet_index index;
 
        memset(&index, 0, sizeof(index));
+       index.stream_id = htobe64(stream_id);
        index.timestamp_end = htobe64(ts);
        ret = consumer_stream_write_index(stream, &index);
        if (ret < 0) {
@@ -128,73 +132,103 @@ error:
        return ret;
 }
 
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
 {
-       uint64_t ts;
+       uint64_t ts, stream_id;
        int ret;
 
-       /*
-        * While holding the stream mutex, try to take a snapshot, if it
-        * succeeds, it means that data is ready to be sent, just let the data
-        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
-        * means that there is no data to read after the flush, so we can
-        * safely send the empty index.
-        */
-       pthread_mutex_lock(&stream->lock);
        ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
-               goto error_unlock;
+               goto end;
        }
        ret = kernctl_buffer_flush(stream->wait_fd);
        if (ret < 0) {
                ERR("Failed to flush kernel stream");
-               goto error_unlock;
+               goto end;
        }
        ret = kernctl_snapshot(stream->wait_fd);
        if (ret < 0) {
-               if (errno != EAGAIN) {
-                       ERR("Taking kernel snapshot");
+               if (errno != EAGAIN && errno != ENODATA) {
+                       PERROR("live timer kernel snapshot");
                        ret = -1;
-                       goto error_unlock;
+                       goto end;
+               }
+               ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
+               if (ret < 0) {
+                       PERROR("kernctl_get_stream_id");
+                       goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
-               ret = send_empty_index(stream, ts);
+               ret = send_empty_index(stream, ts, stream_id);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto end;
                }
        }
        ret = 0;
-
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
+end:
        return ret;
 }
 
-static int check_ust_stream(struct lttng_consumer_stream *stream)
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
 {
-       uint64_t ts;
        int ret;
 
-       assert(stream);
-       assert(stream->ustream);
        /*
         * While holding the stream mutex, try to take a snapshot, if it
         * succeeds, it means that data is ready to be sent, just let the data
         * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
         * means that there is no data to read after the flush, so we can
         * safely send the empty index.
+        *
+        * Doing a trylock and checking if waiting on metadata if
+        * trylock fails. Bail out of the stream is indeed waiting for
+        * metadata to be pushed. Busy wait on trylock otherwise.
         */
-       pthread_mutex_lock(&stream->lock);
+       for (;;) {
+               ret = pthread_mutex_trylock(&stream->lock);
+               switch (ret) {
+               case 0:
+                       break;  /* We have the lock. */
+               case EBUSY:
+                       pthread_mutex_lock(&stream->metadata_timer_lock);
+                       if (stream->waiting_on_metadata) {
+                               ret = 0;
+                               stream->missed_metadata_flush = true;
+                               pthread_mutex_unlock(&stream->metadata_timer_lock);
+                               goto end;       /* Bail out. */
+                       }
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       /* Try again. */
+                       caa_cpu_relax();
+                       continue;
+               default:
+                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
+                       ret = -1;
+                       goto end;
+               }
+               break;
+       }
+       ret = consumer_flush_kernel_index(stream);
+       pthread_mutex_unlock(&stream->lock);
+end:
+       return ret;
+}
+
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
+{
+       uint64_t ts, stream_id;
+       int ret;
+
        ret = cds_lfht_is_node_deleted(&stream->node.node);
        if (ret) {
-               goto error_unlock;
+               goto end;
        }
 
        ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
-               goto error_unlock;
+               goto end;
        }
        lttng_ustconsumer_flush_buffer(stream, 1);
        ret = lttng_ustconsumer_take_snapshot(stream);
@@ -202,18 +236,68 @@ static int check_ust_stream(struct lttng_consumer_stream *stream)
                if (ret != -EAGAIN) {
                        ERR("Taking UST snapshot");
                        ret = -1;
-                       goto error_unlock;
+                       goto end;
+               }
+               ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
+               if (ret < 0) {
+                       PERROR("ustctl_get_stream_id");
+                       goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
-               ret = send_empty_index(stream, ts);
+               ret = send_empty_index(stream, ts, stream_id);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto end;
                }
        }
        ret = 0;
+end:
+       return ret;
+}
 
-error_unlock:
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+       assert(stream->ustream);
+       /*
+        * While holding the stream mutex, try to take a snapshot, if it
+        * succeeds, it means that data is ready to be sent, just let the data
+        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+        * means that there is no data to read after the flush, so we can
+        * safely send the empty index.
+        *
+        * Doing a trylock and checking if waiting on metadata if
+        * trylock fails. Bail out of the stream is indeed waiting for
+        * metadata to be pushed. Busy wait on trylock otherwise.
+        */
+       for (;;) {
+               ret = pthread_mutex_trylock(&stream->lock);
+               switch (ret) {
+               case 0:
+                       break;  /* We have the lock. */
+               case EBUSY:
+                       pthread_mutex_lock(&stream->metadata_timer_lock);
+                       if (stream->waiting_on_metadata) {
+                               ret = 0;
+                               stream->missed_metadata_flush = true;
+                               pthread_mutex_unlock(&stream->metadata_timer_lock);
+                               goto end;       /* Bail out. */
+                       }
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       /* Try again. */
+                       caa_cpu_relax();
+                       continue;
+               default:
+                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
+                       ret = -1;
+                       goto end;
+               }
+               break;
+       }
+       ret = consumer_flush_ust_index(stream);
        pthread_mutex_unlock(&stream->lock);
+end:
        return ret;
 }
 
@@ -397,7 +481,7 @@ void consumer_timer_live_start(struct lttng_consumer_channel *channel,
        assert(channel);
        assert(channel->key);
 
-       if (live_timer_interval == 0) {
+       if (live_timer_interval <= 0) {
                return;
        }
 
@@ -445,7 +529,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
  * Block the RT signals for the entire process. It must be called from the
  * consumer main before creating the threads
  */
-void consumer_signal_init(void)
+int consumer_signal_init(void)
 {
        int ret;
        sigset_t mask;
@@ -456,7 +540,9 @@ void consumer_signal_init(void)
        if (ret) {
                errno = ret;
                PERROR("pthread_sigmask");
+               return -1;
        }
+       return 0;
 }
 
 /*
@@ -470,8 +556,14 @@ void *consumer_timer_thread(void *data)
        siginfo_t info;
        struct lttng_consumer_local_data *ctx = data;
 
+       rcu_register_thread();
+
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
 
+       if (testpoint(consumerd_thread_metadata_timer)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        /* Only self thread will receive signal mask. */
@@ -503,9 +595,13 @@ void *consumer_timer_thread(void *data)
                }
        }
 
-       /* Currently never reached */
+error_testpoint:
+       /* Only reached in testpoint error */
+       health_error();
        health_unregister(health_consumerd);
 
+       rcu_unregister_thread();
+
        /* Never return */
        return NULL;
 }
This page took 0.028802 seconds and 5 git commands to generate.