Adapt to namespaced liblttng-ust-ctl symbols
[lttng-tools.git] / src / common / consumer / consumer-timer.c
index 003e5adef50d2143bf6df5e244544fcad062047c..cbb0062781f3393d927a5af44842388b3e9d4fec 100644 (file)
@@ -26,6 +26,7 @@ typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
                unsigned long *consumed);
 typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
                unsigned long *produced);
+typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
 
 static struct timer_signal_data timer_signal = {
        .tid = 0,
@@ -67,7 +68,7 @@ static void setmask(sigset_t *mask)
        }
 }
 
-static int channel_monitor_pipe = -1;
+static int the_channel_monitor_pipe = -1;
 
 /*
  * Execute action on a timer switch.
@@ -175,7 +176,8 @@ end:
        return ret;
 }
 
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+static int check_stream(struct lttng_consumer_stream *stream,
+               flush_index_cb flush_index)
 {
        int ret;
 
@@ -214,7 +216,7 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream)
                }
                break;
        }
-       ret = consumer_flush_kernel_index(stream);
+       ret = flush_index(stream);
        pthread_mutex_unlock(&stream->lock);
 end:
        return ret;
@@ -245,7 +247,7 @@ int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
                }
                ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
                if (ret < 0) {
-                       PERROR("ustctl_get_stream_id");
+                       PERROR("lttng_ust_ctl_get_stream_id");
                        goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
@@ -259,53 +261,6 @@ end:
        return ret;
 }
 
-static int check_ust_stream(struct lttng_consumer_stream *stream)
-{
-       int ret;
-
-       assert(stream);
-       assert(stream->ustream);
-       /*
-        * While holding the stream mutex, try to take a snapshot, if it
-        * succeeds, it means that data is ready to be sent, just let the data
-        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
-        * means that there is no data to read after the flush, so we can
-        * safely send the empty index.
-        *
-        * Doing a trylock and checking if waiting on metadata if
-        * trylock fails. Bail out of the stream is indeed waiting for
-        * metadata to be pushed. Busy wait on trylock otherwise.
-        */
-       for (;;) {
-               ret = pthread_mutex_trylock(&stream->lock);
-               switch (ret) {
-               case 0:
-                       break;  /* We have the lock. */
-               case EBUSY:
-                       pthread_mutex_lock(&stream->metadata_timer_lock);
-                       if (stream->waiting_on_metadata) {
-                               ret = 0;
-                               stream->missed_metadata_flush = true;
-                               pthread_mutex_unlock(&stream->metadata_timer_lock);
-                               goto end;       /* Bail out. */
-                       }
-                       pthread_mutex_unlock(&stream->metadata_timer_lock);
-                       /* Try again. */
-                       caa_cpu_relax();
-                       continue;
-               default:
-                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
-                       ret = -1;
-                       goto end;
-               }
-               break;
-       }
-       ret = consumer_flush_ust_index(stream);
-       pthread_mutex_unlock(&stream->lock);
-end:
-       return ret;
-}
-
 /*
  * Execute action on a live timer
  */
@@ -315,8 +270,12 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
        int ret;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
-       struct lttng_ht *ht;
        struct lttng_ht_iter iter;
+       const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+       const flush_index_cb flush_index =
+                       ctx->type == LTTNG_CONSUMER_KERNEL ?
+                                       consumer_flush_kernel_index :
+                                       consumer_flush_ust_index;
 
        channel = si->si_value.sival_ptr;
        assert(channel);
@@ -324,38 +283,18 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
        if (channel->switch_timer_error) {
                goto error;
        }
-       ht = consumer_data.stream_per_chan_id_ht;
 
        DBG("Live timer for channel %" PRIu64, channel->key);
 
        rcu_read_lock();
-       switch (ctx->type) {
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                               ht->hash_fct(&channel->key, lttng_ht_seed),
-                               ht->match_fct, &channel->key, &iter.iter,
-                               stream, node_channel_id.node) {
-                       ret = check_ust_stream(stream);
-                       if (ret < 0) {
-                               goto error_unlock;
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER_KERNEL:
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                               ht->hash_fct(&channel->key, lttng_ht_seed),
-                               ht->match_fct, &channel->key, &iter.iter,
-                               stream, node_channel_id.node) {
-                       ret = check_kernel_stream(stream);
-                       if (ret < 0) {
-                               goto error_unlock;
-                       }
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key, &iter.iter,
+                       stream, node_channel_id.node) {
+               ret = check_stream(stream, flush_index);
+               if (ret < 0) {
+                       goto error_unlock;
                }
-               break;
-       case LTTNG_CONSUMER_UNKNOWN:
-               assert(0);
-               break;
        }
 
 error_unlock:
@@ -429,7 +368,7 @@ int consumer_channel_timer_start(timer_t *timer_id,
                unsigned int timer_interval_us, int signal)
 {
        int ret = 0, delete_ret;
-       struct sigevent sev;
+       struct sigevent sev = {};
        struct itimerspec its;
 
        assert(channel);
@@ -631,7 +570,7 @@ int sample_channel_positions(struct lttng_consumer_channel *channel,
        struct lttng_consumer_stream *stream;
        bool empty_channel = true;
        uint64_t high = 0, low = UINT64_MAX;
-       struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+       struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
 
        *_total_consumed = 0;
 
@@ -718,7 +657,7 @@ void monitor_timer(struct lttng_consumer_channel *channel)
                return;
        }
 
-       switch (consumer_data.type) {
+       switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                sample = lttng_kconsumer_sample_snapshot_positions;
                get_consumed = lttng_kconsumer_get_consumed_snapshot;
@@ -769,14 +708,14 @@ void monitor_timer(struct lttng_consumer_channel *channel)
 
 int consumer_timer_thread_get_channel_monitor_pipe(void)
 {
-       return uatomic_read(&channel_monitor_pipe);
+       return uatomic_read(&the_channel_monitor_pipe);
 }
 
 int consumer_timer_thread_set_channel_monitor_pipe(int fd)
 {
        int ret;
 
-       ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
+       ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
        if (ret != -1) {
                ret = -1;
                goto end;
This page took 0.027111 seconds and 5 git commands to generate.