/* Close any relayd session */
consumer_output_send_destroy_relayd(usess->consumer);
+ ust_force_stop_live_timer(usess);
+
/* Destroy every UST application related to this session. */
ret = ust_app_destroy_trace_all(usess);
if (ret) {
}
if (usess && usess->consumer) {
+ /*
+ * Stop the live timer since it can lead to stream and relayd
+ * contention for the data pending check.
+ */
+ ust_force_stop_live_timer(usess);
ret = consumer_is_data_pending(usess->id, usess->consumer);
if (ret == 1) {
/* Data is still being extracted for the kernel. */
goto error;
}
+ /*
+ * Restart the live timer to ensure that we send inactivity
+ * beacon as needed
+ */
+ ust_force_start_live_timer(usess);
}
/* Data is ready to be read by a viewer */
return ret;
}
+/*
+ * Stop live timer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_channel_stop_live_timer(struct consumer_socket *socket, uint64_t key)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+
+ DBG2("Consumer stop live timer for channel key %" PRIu64, key);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_CHANNEL_STOP_LIVE_TIMER;
+ msg.u.stop_live_timer.key = key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
+
+/*
+ * Start live timer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_channel_start_live_timer(struct consumer_socket *socket, uint64_t key)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+
+ DBG2("Consumer start live timer for channel key %" PRIu64, key);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_CHANNEL_START_LIVE_TIMER;
+ msg.u.start_live_timer.key = key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
/*
* Send a clear quiescent command to consumer using the given channel key.
*
uint64_t metadata_key, char *metadata_str, size_t len,
size_t target_offset, uint64_t version);
int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
+int consumer_channel_stop_live_timer(struct consumer_socket *socket, uint64_t key);
+int consumer_channel_start_live_timer(struct consumer_socket *socket, uint64_t key);
int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key);
int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
struct consumer_output *consumer, uint64_t *discarded);
#include <common/common.h>
#include <common/sessiond-comm/sessiond-comm.h>
+#include "consumer.h"
#include "buffer-registry.h"
#include "fd-limit.h"
#include "health-sessiond.h"
lttng_fd_put(LTTNG_FD_APPS, 1);
free(ua_chan->obj);
}
+
call_rcu(&ua_chan->rcu_head, delete_ust_app_channel_rcu);
}
return 0;
}
+/*
+ * Force stop live timers associated with the ust session.
+ */
+int ust_force_stop_live_timer(struct ltt_ust_session *usess)
+{
+ int ret = 0;
+
+ if (usess->live_timer_interval == 0) {
+ goto skip;
+ }
+
+ DBG("Stop all live timer associated with UST session %p.", usess);
+
+ rcu_read_lock();
+
+ switch (usess->buffer_type) {
+ case LTTNG_BUFFER_PER_UID:
+ {
+ struct buffer_reg_uid *reg;
+ struct lttng_ht_iter iter;
+
+ cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+ struct ust_registry_session *ust_session_reg;
+ struct buffer_reg_channel *reg_chan;
+ struct consumer_socket *socket;
+
+ /* Get consumer socket to use */
+ socket = consumer_find_socket_by_bitness(reg->bits_per_long,
+ usess->consumer);
+ if (!socket) {
+ /* Ignore request if no consumer is found for the session. */
+ continue;
+ }
+
+ cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
+ reg_chan, node.node) {
+ ret = consumer_channel_stop_live_timer(socket, reg_chan->consumer_key);
+ if (ret) {
+ ERR("Error stopping live timer for channel %" PRIu64, reg_chan->consumer_key);
+ }
+ }
+ }
+ break;
+ }
+ case LTTNG_BUFFER_PER_PID:
+ {
+ struct lttng_ht_iter iter_i;
+ struct ust_app *app;
+ uint64_t chan_reg_key;
+
+ cds_lfht_for_each_entry(ust_app_ht->ht, &iter_i.iter, app,
+ pid_n.node) {
+ int ret;
+ struct ust_app_session *ua_sess;
+ struct lttng_ht_iter iter_j, iter_k;
+ struct lttng_ht_node_u64 *node;
+ struct ust_app_channel *ua_chan;
+
+ DBG("Stopping live timer associated with ust app pid "
+ "%d",
+ app->pid);
+
+ if (!app->compatible) {
+ goto end;
+ }
+
+ __lookup_session_by_app(usess, app, &iter_j);
+ node = lttng_ht_iter_get_node_u64(&iter_j);
+ if (node == NULL) {
+ /* Session is being or is deleted. */
+ goto end;
+ }
+ ua_sess = caa_container_of(node, struct ust_app_session,
+ node);
+
+ health_code_update();
+
+ cds_lfht_for_each_entry(ua_sess->channels->ht,
+ &iter_k.iter, ua_chan,
+ node.node) {
+ struct consumer_socket *consumer_socket;
+
+ /* Stop live timer immediately if any */
+ consumer_socket =
+ consumer_find_socket_by_bitness(
+ app->bits_per_long,
+ ua_chan->session->consumer);
+ ret = consumer_channel_stop_live_timer(
+ consumer_socket, ua_chan->key);
+ if (ret) {
+ ERR("Error stopping live timer");
+ }
+ }
+ break;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+end:
+ rcu_read_unlock();
+ health_code_update();
+skip:
+ return ret;
+}
+
+/*
+ * Force start live timers associated with the ust session.
+ */
+int ust_force_start_live_timer(struct ltt_ust_session *usess)
+{
+ int ret = 0;
+
+ if (usess->live_timer_interval == 0) {
+ goto skip;
+ }
+
+ DBG("Start all live timer associated with UST session %p", usess);
+
+ rcu_read_lock();
+
+ switch (usess->buffer_type) {
+ case LTTNG_BUFFER_PER_UID:
+ {
+ struct buffer_reg_uid *reg;
+ struct lttng_ht_iter iter;
+
+ cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+ struct ust_registry_session *ust_session_reg;
+ struct buffer_reg_channel *reg_chan;
+ struct consumer_socket *socket;
+
+ /* Get consumer socket to use */
+ socket = consumer_find_socket_by_bitness(reg->bits_per_long,
+ usess->consumer);
+ if (!socket) {
+ /* Ignore request if no consumer is found for the session. */
+ continue;
+ }
+
+ cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
+ reg_chan, node.node) {
+ ret = consumer_channel_start_live_timer(socket, reg_chan->consumer_key);
+ if (ret) {
+ ERR("Error stopping live timer for channel %" PRIu64, reg_chan->consumer_key);
+ }
+ }
+ }
+ break;
+ }
+ case LTTNG_BUFFER_PER_PID:
+ {
+ struct lttng_ht_iter iter_i;
+ struct ust_app *app;
+ uint64_t chan_reg_key;
+
+ cds_lfht_for_each_entry(ust_app_ht->ht, &iter_i.iter, app,
+ pid_n.node) {
+ int ret;
+ struct ust_app_session *ua_sess;
+ struct lttng_ht_iter iter_j, iter_k;
+ struct lttng_ht_node_u64 *node;
+ struct ust_app_channel *ua_chan;
+
+ DBG("Stopping live timer associated with ust app pid "
+ "%d",
+ app->pid);
+
+ if (!app->compatible) {
+ goto end;
+ }
+
+ __lookup_session_by_app(usess, app, &iter_j);
+ node = lttng_ht_iter_get_node_u64(&iter_j);
+ if (node == NULL) {
+ /* Session is being or is deleted. */
+ goto end;
+ }
+ ua_sess = caa_container_of(node, struct ust_app_session,
+ node);
+
+ health_code_update();
+
+ cds_lfht_for_each_entry(ua_sess->channels->ht,
+ &iter_k.iter, ua_chan,
+ node.node) {
+ struct consumer_socket *consumer_socket;
+
+ /* Stop live timer immediately if any */
+ consumer_socket =
+ consumer_find_socket_by_bitness(
+ app->bits_per_long,
+ ua_chan->session->consumer);
+ ret = consumer_channel_start_live_timer(
+ consumer_socket, ua_chan->key);
+ if (ret) {
+ ERR("Error stopping live timer");
+ }
+ }
+ break;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+end:
+ rcu_read_unlock();
+ health_code_update();
+skip:
+ return ret;
+}
+
/*
* Destroy app UST session.
*/
int overwrite, uint64_t *discarded, uint64_t *lost);
int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess);
+int ust_force_stop_live_timer(struct ltt_ust_session *usess);
+int ust_force_start_live_timer(struct ltt_ust_session *usess);
+
static inline
int ust_app_supported(void)
{
return 0;
}
+static inline
+int ust_force_stop_live_timer(struct ltt_ust_session *usess)
+{
+ return 0;
+}
+
+static inline
+int ust_force_start_live_timer(struct ltt_ust_session *usess)
+{
+ return 0;
+}
+
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTT_UST_APP_H */
}
DBG("Live timer for channel %" PRIu64, channel->key);
+ if (channel->live_timer_enabled != 1) {
+ DBG("Liver timer was stopped before the iteration. Quitting early.");
+ goto skip;
+ }
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
stream, node_channel_id.node) {
+ if (channel->live_timer_enabled != 1) {
+ DBG("Liver timer was stopped during the iteration. Quitting early.");
+ goto skip_unlock;
+ }
+
ret = check_stream(stream, flush_index);
if (ret < 0) {
goto error_unlock;
}
}
+skip_unlock:
error_unlock:
rcu_read_unlock();
+skip:
error:
return;
}
assert(channel);
+ if (!channel->live_timer) {
+ return;
+ }
+
ret = timer_delete(channel->live_timer);
if (ret == -1) {
PERROR("timer_delete");
}
- consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
-
channel->live_timer = 0;
channel->live_timer_enabled = 0;
+
+ consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
}
/*
LTTNG_CONSUMER_DISCARDED_EVENTS,
LTTNG_CONSUMER_LOST_PACKETS,
LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
+ LTTNG_CONSUMER_CHANNEL_STOP_LIVE_TIMER,
+ LTTNG_CONSUMER_CHANNEL_START_LIVE_TIMER,
};
/* State of each fd in consumer */
struct {
uint64_t key; /* Channel key. */
} LTTNG_PACKED flush_channel;
+ struct {
+ uint64_t key; /* Channel key. */
+ } LTTNG_PACKED stop_live_timer;
+ struct {
+ uint64_t key; /* Channel key. */
+ } LTTNG_PACKED start_live_timer;
struct {
uint64_t key; /* Channel key. */
} LTTNG_PACKED clear_quiescent_channel;
assert(cds_list_empty(&channel->streams.head));
goto end_msg_sessiond;
}
+ case LTTNG_CONSUMER_CHANNEL_STOP_LIVE_TIMER:
+ {
+ uint64_t key = msg.u.get_channel.key;
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("UST consumer get channel key %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
+ }
+
+ health_code_update();
+
+ if (channel->live_timer_enabled == 1) {
+ consumer_timer_live_stop(channel);
+ }
+
+ health_code_update();
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_CHANNEL_START_LIVE_TIMER:
+ {
+ uint64_t key = msg.u.get_channel.key;
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("UST consumer get channel key %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
+ }
+
+ health_code_update();
+
+ if (channel->live_timer_enabled == 0) {
+ consumer_timer_live_start(channel, channel->live_timer_interval);
+ }
+
+ health_code_update();
+
+ goto end_msg_sessiond;
+ }
+
case LTTNG_CONSUMER_DESTROY_CHANNEL:
{
uint64_t key = msg.u.destroy_channel.key;