consumerd: refactor: combine duplicated check_*_functions
[deliverable/lttng-tools.git] / src / common / consumer / consumer-timer.c
index 931f7471fc6fe974982ff1cb687750ff68f80a7c..12d72afbb30cc2260d8e15d98b69098e91acaf8b 100644 (file)
 #include <common/consumer/consumer-testpoint.h>
 #include <common/ust-consumer/ust-consumer.h>
 
+typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
+typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
+               unsigned long *consumed);
+typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
+               unsigned long *produced);
+typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
+
 static struct timer_signal_data timer_signal = {
        .tid = 0,
        .setup_done = 0,
@@ -61,6 +68,10 @@ static void setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigaddset live");
        }
+       ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
+       if (ret) {
+               PERROR("sigaddset exit");
+       }
 }
 
 /*
@@ -148,7 +159,7 @@ int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
        }
        ret = kernctl_snapshot(stream->wait_fd);
        if (ret < 0) {
-               if (errno != EAGAIN && errno != ENODATA) {
+               if (ret != -EAGAIN && ret != -ENODATA) {
                        PERROR("live timer kernel snapshot");
                        ret = -1;
                        goto end;
@@ -169,7 +180,8 @@ end:
        return ret;
 }
 
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+static int check_stream(struct lttng_consumer_stream *stream,
+               flush_index_cb flush_index)
 {
        int ret;
 
@@ -208,7 +220,7 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream)
                }
                break;
        }
-       ret = consumer_flush_kernel_index(stream);
+       ret = flush_index(stream);
        pthread_mutex_unlock(&stream->lock);
 end:
        return ret;
@@ -253,53 +265,6 @@ end:
        return ret;
 }
 
-static int check_ust_stream(struct lttng_consumer_stream *stream)
-{
-       int ret;
-
-       assert(stream);
-       assert(stream->ustream);
-       /*
-        * While holding the stream mutex, try to take a snapshot, if it
-        * succeeds, it means that data is ready to be sent, just let the data
-        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
-        * means that there is no data to read after the flush, so we can
-        * safely send the empty index.
-        *
-        * Doing a trylock and checking if waiting on metadata if
-        * trylock fails. Bail out of the stream is indeed waiting for
-        * metadata to be pushed. Busy wait on trylock otherwise.
-        */
-       for (;;) {
-               ret = pthread_mutex_trylock(&stream->lock);
-               switch (ret) {
-               case 0:
-                       break;  /* We have the lock. */
-               case EBUSY:
-                       pthread_mutex_lock(&stream->metadata_timer_lock);
-                       if (stream->waiting_on_metadata) {
-                               ret = 0;
-                               stream->missed_metadata_flush = true;
-                               pthread_mutex_unlock(&stream->metadata_timer_lock);
-                               goto end;       /* Bail out. */
-                       }
-                       pthread_mutex_unlock(&stream->metadata_timer_lock);
-                       /* Try again. */
-                       caa_cpu_relax();
-                       continue;
-               default:
-                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
-                       ret = -1;
-                       goto end;
-               }
-               break;
-       }
-       ret = consumer_flush_ust_index(stream);
-       pthread_mutex_unlock(&stream->lock);
-end:
-       return ret;
-}
-
 /*
  * Execute action on a live timer
  */
@@ -309,8 +274,12 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
        int ret;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
-       struct lttng_ht *ht;
        struct lttng_ht_iter iter;
+       const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+       const flush_index_cb flush_index =
+                       ctx->type == LTTNG_CONSUMER_KERNEL ?
+                                       consumer_flush_kernel_index :
+                                       consumer_flush_ust_index;
 
        channel = si->si_value.sival_ptr;
        assert(channel);
@@ -318,38 +287,18 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
        if (channel->switch_timer_error) {
                goto error;
        }
-       ht = consumer_data.stream_per_chan_id_ht;
 
        DBG("Live timer for channel %" PRIu64, channel->key);
 
        rcu_read_lock();
-       switch (ctx->type) {
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                               ht->hash_fct(&channel->key, lttng_ht_seed),
-                               ht->match_fct, &channel->key, &iter.iter,
-                               stream, node_channel_id.node) {
-                       ret = check_ust_stream(stream);
-                       if (ret < 0) {
-                               goto error_unlock;
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER_KERNEL:
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                               ht->hash_fct(&channel->key, lttng_ht_seed),
-                               ht->match_fct, &channel->key, &iter.iter,
-                               stream, node_channel_id.node) {
-                       ret = check_kernel_stream(stream);
-                       if (ret < 0) {
-                               goto error_unlock;
-                       }
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key, &iter.iter,
+                       stream, node_channel_id.node) {
+               ret = check_stream(stream, flush_index);
+               if (ret < 0) {
+                       goto error_unlock;
                }
-               break;
-       case LTTNG_CONSUMER_UNKNOWN:
-               assert(0);
-               break;
        }
 
 error_unlock:
@@ -381,7 +330,7 @@ void consumer_timer_signal_thread_qs(unsigned int signr)
                if (ret == -1) {
                        PERROR("sigpending");
                }
-               if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
+               if (!sigismember(&pending_set, signr)) {
                        break;
                }
                caa_cpu_relax();
@@ -546,7 +495,8 @@ int consumer_signal_init(void)
 
 /*
  * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
- * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
+ * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
+ * LTTNG_CONSUMER_SIG_EXIT.
  */
 void *consumer_timer_thread(void *data)
 {
@@ -589,6 +539,9 @@ void *consumer_timer_thread(void *data)
                        DBG("Signal timer metadata thread teardown");
                } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
                        live_timer(ctx, info.si_signo, &info, NULL);
+               } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
+                       assert(consumer_quit);
+                       goto end;
                } else {
                        ERR("Unexpected signal %d\n", info.si_signo);
                }
@@ -597,10 +550,8 @@ void *consumer_timer_thread(void *data)
 error_testpoint:
        /* Only reached in testpoint error */
        health_error();
+end:
        health_unregister(health_consumerd);
-
        rcu_unregister_thread();
-
-       /* Never return */
        return NULL;
 }
This page took 0.02864 seconds and 5 git commands to generate.