CUSTOM: liver timer: immediate liver timer control on data pending and destroy
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Wed, 18 Nov 2020 19:49:05 +0000 (14:49 -0500)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Mon, 25 Jan 2021 19:12:28 +0000 (14:12 -0500)
Note: this patch is not a bugfix, it is a targeted modification to
improve timing and predictability for particular scenarios.

Note: this patch only applies to userspace tracing.

Part of this work might find itself upstream but most probably not in
this form. Deeper work should be done upstream to mitigate the real root
of the problem which is how the network protocol for live works and
lifetime management of the different component required for the live
feature.

Scenario #1:
=======
   System with high CPU resource restriction (base high workload 99% cpu load),
   High CPU count (32),
   Slowish network (ping ~60-100ms),
   Timing constraint for create, configure, start, stop , destroy , create ... cycles.

We see additional delay in the second cycles at the configure step
(enable channel). This delay seems to be the result of the execution of
a live timer iteration for the previously destroyed session (cycle N
-1).

The live timer for the N - 1 session is still present since it is only
stopped lazily after all associated streams are destroyed
(consumer_del_channel, unref_channel, consumer_stream_destroy,
consumer_del_stream, consumer_thread_data_poll).

To help this situation, stop the live timer on cmd_destroy. The live
timer does not have any real value for a session to be destroyed and ,in
most case, already stopped.

Scenario #2
======
   System with high CPU resource restriction (base high workload 99% cpu load),
   High CPU count (32),
   Slowish network (ping ~60-100ms).

The data pending phase can be aggravated by the live timer depending on
the network and CPU contention.

In the context of stable 2.9 and live reading, stopping the live timer
during the data pending phase of a stop command does not impact
functionality at the reader level for a single session. Note that for
stable 2.9 and as of lttng 2.13 a reader cannot hook itself on multiple
sessions. Hence stopping the live timer during the data pending stage
and starting it back does not have any foreseeable downside.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Change-Id: Icf8ff6fa72b24ce221cdf866d134ee42f64c9291

src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/common/consumer/consumer-timer.c
src/common/consumer/consumer.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index d5ed1dcfc39e4d02a6d0a8996f1d7945c91f2137..8cb60b3aa51d2ace2f7efaea0375e809cf78dbed 100644 (file)
@@ -2773,6 +2773,8 @@ int cmd_destroy_session(struct ltt_session *session, int wpipe)
                /* Close any relayd session */
                consumer_output_send_destroy_relayd(usess->consumer);
 
+               ust_force_stop_live_timer(usess);
+
                /* Destroy every UST application related to this session. */
                ret = ust_app_destroy_trace_all(usess);
                if (ret) {
@@ -3172,11 +3174,21 @@ int cmd_data_pending(struct ltt_session *session)
        }
 
        if (usess && usess->consumer) {
+               /*
+                * Stop the live timer since it can lead to stream and relayd
+                * contention for the data pending check.
+                */
+               ust_force_stop_live_timer(usess);
                ret = consumer_is_data_pending(usess->id, usess->consumer);
                if (ret == 1) {
                        /* Data is still being extracted for the kernel. */
                        goto error;
                }
+               /*
+                * Restart the live timer to ensure that we send inactivity
+                * beacon as needed
+                */
+               ust_force_start_live_timer(usess);
        }
 
        /* Data is ready to be read by a viewer */
index 717dcdbf30859b58b4f9be2c1c6f0d848d27cedb..76572a075c9afca920d782a8b7639c11396c9896 100644 (file)
@@ -1205,6 +1205,69 @@ end:
        return ret;
 }
 
+/*
+ * Stop live timer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_channel_stop_live_timer(struct consumer_socket *socket, uint64_t key)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(socket);
+
+       DBG2("Consumer stop live timer for channel key %" PRIu64, key);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_CHANNEL_STOP_LIVE_TIMER;
+       msg.u.stop_live_timer.key = key;
+
+       pthread_mutex_lock(socket->lock);
+       health_code_update();
+
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto end;
+       }
+
+end:
+       health_code_update();
+       pthread_mutex_unlock(socket->lock);
+       return ret;
+}
+
+/*
+ * Start live timer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_channel_start_live_timer(struct consumer_socket *socket, uint64_t key)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(socket);
+
+       DBG2("Consumer start live timer for channel key %" PRIu64, key);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_CHANNEL_START_LIVE_TIMER;
+       msg.u.start_live_timer.key = key;
+
+       pthread_mutex_lock(socket->lock);
+       health_code_update();
+
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto end;
+       }
+
+end:
+       health_code_update();
+       pthread_mutex_unlock(socket->lock);
+       return ret;
+}
 /*
  * Send a clear quiescent command to consumer using the given channel key.
  *
index 42e4ec892ea2d3d4b044ea1fe03ef7cc7ad19219..68bb3e1785e22ba82fae3e3026b93e296ca59749 100644 (file)
@@ -286,6 +286,8 @@ int consumer_push_metadata(struct consumer_socket *socket,
                uint64_t metadata_key, char *metadata_str, size_t len,
                size_t target_offset, uint64_t version);
 int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
+int consumer_channel_stop_live_timer(struct consumer_socket *socket, uint64_t key);
+int consumer_channel_start_live_timer(struct consumer_socket *socket, uint64_t key);
 int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key);
 int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
                struct consumer_output *consumer, uint64_t *discarded);
index d3369fe39f536c45b461fc8368785b5113a8a13f..b27af473f2e1892e99aed613141b19ddc12cefef 100644 (file)
@@ -33,6 +33,7 @@
 #include <common/common.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 
+#include "consumer.h"
 #include "buffer-registry.h"
 #include "fd-limit.h"
 #include "health-sessiond.h"
@@ -502,6 +503,7 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
                lttng_fd_put(LTTNG_FD_APPS, 1);
                free(ua_chan->obj);
        }
+
        call_rcu(&ua_chan->rcu_head, delete_ust_app_channel_rcu);
 }
 
@@ -4912,6 +4914,222 @@ int ust_app_stop_trace_all(struct ltt_ust_session *usess)
        return 0;
 }
 
+/*
+ * Force stop live timers associated with the ust session.
+ */
+int ust_force_stop_live_timer(struct ltt_ust_session *usess)
+{
+       int ret = 0;
+
+       if (usess->live_timer_interval == 0) {
+               goto skip;
+       }
+
+       DBG("Stop all live timer associated with UST session %p.", usess);
+
+       rcu_read_lock();
+
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct buffer_reg_uid *reg;
+               struct lttng_ht_iter iter;
+
+               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct ust_registry_session *ust_session_reg;
+                       struct buffer_reg_channel *reg_chan;
+                       struct consumer_socket *socket;
+
+                       /* Get consumer socket to use */
+                       socket = consumer_find_socket_by_bitness(reg->bits_per_long,
+                                       usess->consumer);
+                       if (!socket) {
+                               /* Ignore request if no consumer is found for the session. */
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
+                                       reg_chan, node.node) {
+                               ret = consumer_channel_stop_live_timer(socket, reg_chan->consumer_key);
+                               if (ret) {
+                                       ERR("Error stopping live timer for channel %" PRIu64, reg_chan->consumer_key);
+                               }
+                       }
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               struct lttng_ht_iter iter_i;
+               struct ust_app *app;
+               uint64_t chan_reg_key;
+
+               cds_lfht_for_each_entry(ust_app_ht->ht, &iter_i.iter, app,
+                                       pid_n.node) {
+                       int ret;
+                       struct ust_app_session *ua_sess;
+                       struct lttng_ht_iter iter_j, iter_k;
+                       struct lttng_ht_node_u64 *node;
+                       struct ust_app_channel *ua_chan;
+
+                       DBG("Stopping live timer associated with ust app pid "
+                           "%d",
+                           app->pid);
+
+                       if (!app->compatible) {
+                               goto end;
+                       }
+
+                       __lookup_session_by_app(usess, app, &iter_j);
+                       node = lttng_ht_iter_get_node_u64(&iter_j);
+                       if (node == NULL) {
+                               /* Session is being or is deleted. */
+                               goto end;
+                       }
+                       ua_sess = caa_container_of(node, struct ust_app_session,
+                                                  node);
+
+                       health_code_update();
+
+                       cds_lfht_for_each_entry(ua_sess->channels->ht,
+                                               &iter_k.iter, ua_chan,
+                                               node.node) {
+                               struct consumer_socket *consumer_socket;
+
+                               /* Stop live timer immediately if any */
+                               consumer_socket =
+                                   consumer_find_socket_by_bitness(
+                                       app->bits_per_long,
+                                       ua_chan->session->consumer);
+                               ret = consumer_channel_stop_live_timer(
+                                   consumer_socket, ua_chan->key);
+                               if (ret) {
+                                       ERR("Error stopping live timer");
+                               }
+                       }
+                       break;
+               }
+               break;
+       }
+       default:
+               break;
+       }
+
+end:
+       rcu_read_unlock();
+       health_code_update();
+skip:
+       return ret;
+}
+
+/*
+ * Force start live timers associated with the ust session.
+ */
+int ust_force_start_live_timer(struct ltt_ust_session *usess)
+{
+       int ret = 0;
+
+       if (usess->live_timer_interval == 0) {
+               goto skip;
+       }
+
+       DBG("Start all live timer associated with UST session %p", usess);
+
+       rcu_read_lock();
+
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct buffer_reg_uid *reg;
+               struct lttng_ht_iter iter;
+
+               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct ust_registry_session *ust_session_reg;
+                       struct buffer_reg_channel *reg_chan;
+                       struct consumer_socket *socket;
+
+                       /* Get consumer socket to use */
+                       socket = consumer_find_socket_by_bitness(reg->bits_per_long,
+                                       usess->consumer);
+                       if (!socket) {
+                               /* Ignore request if no consumer is found for the session. */
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
+                                       reg_chan, node.node) {
+                               ret = consumer_channel_start_live_timer(socket, reg_chan->consumer_key);
+                               if (ret) {
+                                       ERR("Error stopping live timer for channel %" PRIu64, reg_chan->consumer_key);
+                               }
+                       }
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               struct lttng_ht_iter iter_i;
+               struct ust_app *app;
+               uint64_t chan_reg_key;
+
+               cds_lfht_for_each_entry(ust_app_ht->ht, &iter_i.iter, app,
+                                       pid_n.node) {
+                       int ret;
+                       struct ust_app_session *ua_sess;
+                       struct lttng_ht_iter iter_j, iter_k;
+                       struct lttng_ht_node_u64 *node;
+                       struct ust_app_channel *ua_chan;
+
+                       DBG("Stopping live timer associated with ust app pid "
+                           "%d",
+                           app->pid);
+
+                       if (!app->compatible) {
+                               goto end;
+                       }
+
+                       __lookup_session_by_app(usess, app, &iter_j);
+                       node = lttng_ht_iter_get_node_u64(&iter_j);
+                       if (node == NULL) {
+                               /* Session is being or is deleted. */
+                               goto end;
+                       }
+                       ua_sess = caa_container_of(node, struct ust_app_session,
+                                                  node);
+
+                       health_code_update();
+
+                       cds_lfht_for_each_entry(ua_sess->channels->ht,
+                                               &iter_k.iter, ua_chan,
+                                               node.node) {
+                               struct consumer_socket *consumer_socket;
+
+                               /* Stop live timer immediately if any */
+                               consumer_socket =
+                                   consumer_find_socket_by_bitness(
+                                       app->bits_per_long,
+                                       ua_chan->session->consumer);
+                               ret = consumer_channel_start_live_timer(
+                                   consumer_socket, ua_chan->key);
+                               if (ret) {
+                                       ERR("Error stopping live timer");
+                               }
+                       }
+                       break;
+               }
+               break;
+       }
+       default:
+               break;
+       }
+
+end:
+       rcu_read_unlock();
+       health_code_update();
+skip:
+       return ret;
+}
+
 /*
  * Destroy app UST session.
  */
index 48f9fd38147318e1631b4695fdcfdf86c0572059..313a9534d90a7ba7402ff9b9de976d72ea88a035 100644 (file)
@@ -353,6 +353,9 @@ int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
                int overwrite, uint64_t *discarded, uint64_t *lost);
 int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess);
 
+int ust_force_stop_live_timer(struct ltt_ust_session *usess);
+int ust_force_start_live_timer(struct ltt_ust_session *usess);
+
 static inline
 int ust_app_supported(void)
 {
@@ -585,6 +588,18 @@ int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess)
        return 0;
 }
 
+static inline
+int ust_force_stop_live_timer(struct ltt_ust_session *usess)
+{
+       return 0;
+}
+
+static inline
+int ust_force_start_live_timer(struct ltt_ust_session *usess)
+{
+       return 0;
+}
+
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTT_UST_APP_H */
index 12d72afbb30cc2260d8e15d98b69098e91acaf8b..2ccec32b56b9b00b12617b0d4d6cce4ad761688d 100644 (file)
@@ -289,21 +289,32 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
        }
 
        DBG("Live timer for channel %" PRIu64, channel->key);
+       if (channel->live_timer_enabled != 1) {
+               DBG("Liver timer was stopped before the iteration. Quitting early.");
+               goto skip;
+       }
 
        rcu_read_lock();
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
                        stream, node_channel_id.node) {
+               if (channel->live_timer_enabled != 1) {
+                       DBG("Liver timer was stopped during the iteration. Quitting early.");
+                       goto skip_unlock;
+               }
+
                ret = check_stream(stream, flush_index);
                if (ret < 0) {
                        goto error_unlock;
                }
        }
 
+skip_unlock:
 error_unlock:
        rcu_read_unlock();
 
+skip:
 error:
        return;
 }
@@ -462,15 +473,19 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
 
        assert(channel);
 
+       if (!channel->live_timer) {
+               return;
+       }
+
        ret = timer_delete(channel->live_timer);
        if (ret == -1) {
                PERROR("timer_delete");
        }
 
-       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
-
        channel->live_timer = 0;
        channel->live_timer_enabled = 0;
+
+       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
 }
 
 /*
index 53ab4c577ce1668773902ef9072377a3517fc38c..a5a34d44f75cfbf68f700249fac50b3dd296d94b 100644 (file)
@@ -65,6 +65,8 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_DISCARDED_EVENTS,
        LTTNG_CONSUMER_LOST_PACKETS,
        LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
+       LTTNG_CONSUMER_CHANNEL_STOP_LIVE_TIMER,
+       LTTNG_CONSUMER_CHANNEL_START_LIVE_TIMER,
 };
 
 /* State of each fd in consumer */
index 06af0b48cb62e68542cc7b435fd69724b692458d..ceb56f6717eb0f2e36b18274424178ca3916d87c 100644 (file)
@@ -533,6 +533,12 @@ struct lttcomm_consumer_msg {
                struct {
                        uint64_t key;   /* Channel key. */
                } LTTNG_PACKED flush_channel;
+               struct {
+                       uint64_t key;   /* Channel key. */
+               } LTTNG_PACKED stop_live_timer;
+               struct {
+                       uint64_t key;   /* Channel key. */
+               } LTTNG_PACKED start_live_timer;
                struct {
                        uint64_t key;   /* Channel key. */
                } LTTNG_PACKED clear_quiescent_channel;
index 1bde820cd97fe1a9e2885f467b375ec3530fb6a9..97cd27d9dd02cf5022923ce2159b870f3c976156 100644 (file)
@@ -1651,6 +1651,51 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                assert(cds_list_empty(&channel->streams.head));
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_CHANNEL_STOP_LIVE_TIMER:
+       {
+               uint64_t key = msg.u.get_channel.key;
+               struct lttng_consumer_channel *channel;
+
+               channel = consumer_find_channel(key);
+               if (!channel) {
+                       ERR("UST consumer get channel key %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+                       goto end_msg_sessiond;
+               }
+
+               health_code_update();
+
+               if (channel->live_timer_enabled == 1) {
+                       consumer_timer_live_stop(channel);
+               }
+
+               health_code_update();
+
+               goto end_msg_sessiond;
+       }
+       case LTTNG_CONSUMER_CHANNEL_START_LIVE_TIMER:
+       {
+               uint64_t key = msg.u.get_channel.key;
+               struct lttng_consumer_channel *channel;
+
+               channel = consumer_find_channel(key);
+               if (!channel) {
+                       ERR("UST consumer get channel key %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+                       goto end_msg_sessiond;
+               }
+
+               health_code_update();
+
+               if (channel->live_timer_enabled == 0) {
+                       consumer_timer_live_start(channel, channel->live_timer_interval);
+               }
+
+               health_code_update();
+
+               goto end_msg_sessiond;
+       }
+
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
        {
                uint64_t key = msg.u.destroy_channel.key;
This page took 0.048668 seconds and 5 git commands to generate.