#include <lttng/notification/notification-internal.h>
#include <lttng/condition/condition-internal.h>
#include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
#include <lttng/notification/channel-internal.h>
#include <time.h>
struct cds_lfht_node channel_state_ht_node;
uint64_t highest_usage;
uint64_t lowest_usage;
+ uint64_t channel_total_consumed;
};
static unsigned long hash_channel_key(struct channel_key *key);
-static int evaluate_condition(struct lttng_condition *condition,
+static int evaluate_condition(const struct lttng_condition *condition,
struct lttng_evaluation **evaluation,
- struct notification_thread_state *state,
- struct channel_state_sample *previous_sample,
- struct channel_state_sample *latest_sample,
- uint64_t buffer_capacity);
+ const struct notification_thread_state *state,
+ const struct channel_state_sample *previous_sample,
+ const struct channel_state_sample *latest_sample,
+ uint64_t previous_session_consumed_total,
+ uint64_t latest_session_consumed_total,
+ struct channel_info *channel_info);
static
int send_evaluation_to_clients(struct lttng_trigger *trigger,
struct lttng_evaluation *evaluation,
return hash;
}
+static
+unsigned long lttng_condition_session_consumed_size_hash(
+ struct lttng_condition *_condition)
+{
+ unsigned long hash = 0;
+ struct lttng_condition_session_consumed_size *condition;
+ uint64_t val;
+
+ condition = container_of(_condition,
+ struct lttng_condition_session_consumed_size, parent);
+
+ if (condition->session_name) {
+ hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
+ }
+ val = condition->consumed_threshold_bytes.value;
+ hash ^= hash_key_u64(&val, lttng_ht_seed);
+ return hash;
+}
+
/*
* The lttng_condition hashing code is kept in this file (rather than
* condition.c) since it makes use of GPLv2 code (hashtable utils), which we
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
return lttng_condition_buffer_usage_hash(condition);
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ return lttng_condition_session_consumed_size_hash(condition);
default:
ERR("[notification-thread] Unexpected condition type caught");
abort();
void session_info_destroy(void *_data)
{
struct session_info *session_info = _data;
+ int ret;
assert(session_info);
if (session_info->channel_infos_ht) {
- cds_lfht_destroy(session_info->channel_infos_ht, NULL);
+ ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
+ if (ret) {
+ ERR("[notification-thread] Failed to destroy channel information hash table");
+ }
}
free(session_info->name);
free(session_info);
goto end;
}
- ret = evaluate_condition(condition, &evaluation, state, NULL,
- last_sample, channel_info->capacity);
+ ret = evaluate_condition(condition, &evaluation, state,
+ NULL, last_sample,
+ 0, channel_info->session_info->consumed_data_size,
+ channel_info);
if (ret) {
WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
goto end;
}
static
-bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+bool buffer_usage_condition_applies_to_channel(
+ struct lttng_condition *condition,
struct channel_info *channel_info)
{
enum lttng_condition_status status;
- struct lttng_condition *condition;
- const char *trigger_session_name = NULL;
- const char *trigger_channel_name = NULL;
- enum lttng_domain_type trigger_domain;
+ enum lttng_domain_type condition_domain;
+ const char *condition_session_name = NULL;
+ const char *condition_channel_name = NULL;
- condition = lttng_trigger_get_condition(trigger);
- if (!condition) {
+ status = lttng_condition_buffer_usage_get_domain_type(condition,
+ &condition_domain);
+ assert(status == LTTNG_CONDITION_STATUS_OK);
+ if (channel_info->key.domain != condition_domain) {
goto fail;
}
- switch (lttng_condition_get_type(condition)) {
- case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
- case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
- break;
- default:
+ status = lttng_condition_buffer_usage_get_session_name(
+ condition, &condition_session_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+ status = lttng_condition_buffer_usage_get_channel_name(
+ condition, &condition_channel_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
+
+ if (strcmp(channel_info->session_info->name, condition_session_name)) {
+ goto fail;
+ }
+ if (strcmp(channel_info->name, condition_channel_name)) {
goto fail;
}
- status = lttng_condition_buffer_usage_get_domain_type(condition,
- &trigger_domain);
- assert(status == LTTNG_CONDITION_STATUS_OK);
- if (channel_info->key.domain != trigger_domain) {
+ return true;
+fail:
+ return false;
+}
+
+static
+bool session_consumed_size_condition_applies_to_channel(
+ struct lttng_condition *condition,
+ struct channel_info *channel_info)
+{
+ enum lttng_condition_status status;
+ const char *condition_session_name = NULL;
+
+ status = lttng_condition_session_consumed_size_get_session_name(
+ condition, &condition_session_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+ if (strcmp(channel_info->session_info->name, condition_session_name)) {
goto fail;
}
- status = lttng_condition_buffer_usage_get_session_name(
- condition, &trigger_session_name);
- assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
+ return true;
+fail:
+ return false;
+}
- status = lttng_condition_buffer_usage_get_channel_name(
- condition, &trigger_channel_name);
- assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
+static
+bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+ struct channel_info *channel_info)
+{
+ struct lttng_condition *condition;
+ bool trigger_applies;
- if (strcmp(channel_info->session_info->name, trigger_session_name)) {
+ condition = lttng_trigger_get_condition(trigger);
+ if (!condition) {
goto fail;
}
- if (strcmp(channel_info->name, trigger_channel_name)) {
+
+ switch (lttng_condition_get_type(condition)) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+ trigger_applies = buffer_usage_condition_applies_to_channel(
+ condition, channel_info);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ trigger_applies = session_consumed_size_condition_applies_to_channel(
+ condition, channel_info);
+ break;
+ default:
goto fail;
}
- return true;
+ return trigger_applies;
fail:
return false;
}
CDS_INIT_LIST_HEAD(&trigger_list_element->node);
trigger_list_element->trigger = trigger;
cds_list_add(&trigger_list_element->node, &trigger_list->list);
-
- /* A trigger can only apply to one channel. */
- break;
}
/*
}
static
-bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
- struct channel_state_sample *sample, uint64_t buffer_capacity)
+bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
+ const struct channel_state_sample *sample,
+ uint64_t buffer_capacity)
{
bool result = false;
uint64_t threshold;
enum lttng_condition_type condition_type;
- struct lttng_condition_buffer_usage *use_condition = container_of(
+ const struct lttng_condition_buffer_usage *use_condition = container_of(
condition, struct lttng_condition_buffer_usage,
parent);
- if (!sample) {
- goto end;
- }
-
if (use_condition->threshold_bytes.set) {
threshold = use_condition->threshold_bytes.value;
} else {
result = true;
}
}
-end:
+
return result;
}
static
-int evaluate_condition(struct lttng_condition *condition,
+bool evaluate_session_consumed_size_condition(
+ const struct lttng_condition *condition,
+ uint64_t session_consumed_size)
+{
+ uint64_t threshold;
+ const struct lttng_condition_session_consumed_size *size_condition =
+ container_of(condition,
+ struct lttng_condition_session_consumed_size,
+ parent);
+
+ threshold = size_condition->consumed_threshold_bytes.value;
+ DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+ threshold, session_consumed_size);
+ return session_consumed_size >= threshold;
+}
+
+static
+int evaluate_condition(const struct lttng_condition *condition,
struct lttng_evaluation **evaluation,
- struct notification_thread_state *state,
- struct channel_state_sample *previous_sample,
- struct channel_state_sample *latest_sample,
- uint64_t buffer_capacity)
+ const struct notification_thread_state *state,
+ const struct channel_state_sample *previous_sample,
+ const struct channel_state_sample *latest_sample,
+ uint64_t previous_session_consumed_total,
+ uint64_t latest_session_consumed_total,
+ struct channel_info *channel_info)
{
int ret = 0;
enum lttng_condition_type condition_type;
- bool previous_sample_result;
+ const bool previous_sample_available = !!previous_sample;
+ bool previous_sample_result = false;
bool latest_sample_result;
condition_type = lttng_condition_get_type(condition);
- /* No other condition type supported for the moment. */
- assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
- condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
- previous_sample_result = evaluate_buffer_usage_condition(condition,
- previous_sample, buffer_capacity);
- latest_sample_result = evaluate_buffer_usage_condition(condition,
- latest_sample, buffer_capacity);
+ switch (condition_type) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+ if (caa_likely(previous_sample_available)) {
+ previous_sample_result =
+ evaluate_buffer_usage_condition(condition,
+ previous_sample, channel_info->capacity);
+ }
+ latest_sample_result = evaluate_buffer_usage_condition(
+ condition, latest_sample,
+ channel_info->capacity);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ if (caa_likely(previous_sample_available)) {
+ previous_sample_result =
+ evaluate_session_consumed_size_condition(
+ condition,
+ previous_session_consumed_total);
+ }
+ latest_sample_result =
+ evaluate_session_consumed_size_condition(
+ condition,
+ latest_session_consumed_total);
+ break;
+ default:
+ /* Unknown condition type; internal error. */
+ abort();
+ }
if (!latest_sample_result ||
(previous_sample_result == latest_sample_result)) {
goto end;
}
- if (evaluation && latest_sample_result) {
+ if (!evaluation || !latest_sample_result) {
+ goto end;
+ }
+
+ switch (condition_type) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
*evaluation = lttng_evaluation_buffer_usage_create(
condition_type,
latest_sample->highest_usage,
- buffer_capacity);
- if (!*evaluation) {
- ret = -1;
- goto end;
- }
+ channel_info->capacity);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ *evaluation = lttng_evaluation_session_consumed_size_create(
+ condition_type,
+ latest_session_consumed_total);
+ break;
+ default:
+ abort();
+ }
+
+ if (!*evaluation) {
+ ret = -1;
+ goto end;
}
end:
return ret;
{
int ret = 0;
struct lttcomm_consumer_channel_monitor_msg sample_msg;
- struct channel_state_sample previous_sample, latest_sample;
struct channel_info *channel_info;
struct cds_lfht_node *node;
struct cds_lfht_iter iter;
struct lttng_channel_trigger_list *trigger_list;
struct lttng_trigger_list_element *trigger_list_element;
bool previous_sample_available = false;
+ struct channel_state_sample previous_sample, latest_sample;
+ uint64_t previous_session_consumed_total, latest_session_consumed_total;
/*
* The monitoring pipe only holds messages smaller than PIPE_BUF,
latest_sample.key.domain = domain;
latest_sample.highest_usage = sample_msg.highest;
latest_sample.lowest_usage = sample_msg.lowest;
+ latest_sample.channel_total_consumed = sample_msg.total_consumed;
rcu_read_lock();
&latest_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
- if (!node) {
+ if (caa_unlikely(!node)) {
/*
* Not an error since the consumer can push a sample to the pipe
* and the rest of the session daemon could notify us of the
}
channel_info = caa_container_of(node, struct channel_info,
channels_ht_node);
- DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
+ DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
channel_info->name,
latest_sample.key.key,
channel_info->session_info->name,
latest_sample.highest_usage,
- latest_sample.lowest_usage);
+ latest_sample.lowest_usage,
+ latest_sample.channel_total_consumed);
+
+ previous_session_consumed_total =
+ channel_info->session_info->consumed_data_size;
/* Retrieve the channel's last sample, if it exists, and update it. */
cds_lfht_lookup(state->channel_state_ht,
&latest_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
- if (node) {
+ if (caa_likely(node)) {
struct channel_state_sample *stored_sample;
/* Update the sample stored. */
stored_sample = caa_container_of(node,
struct channel_state_sample,
channel_state_ht_node);
+
memcpy(&previous_sample, stored_sample,
sizeof(previous_sample));
stored_sample->highest_usage = latest_sample.highest_usage;
stored_sample->lowest_usage = latest_sample.lowest_usage;
+ stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
previous_sample_available = true;
+
+ latest_session_consumed_total =
+ previous_session_consumed_total +
+ (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
} else {
/*
* This is the channel's first sample, allocate space for and
cds_lfht_add(state->channel_state_ht,
hash_channel_key(&stored_sample->key),
&stored_sample->channel_state_ht_node);
+
+ latest_session_consumed_total =
+ previous_session_consumed_total +
+ latest_sample.channel_total_consumed;
}
+ channel_info->session_info->consumed_data_size =
+ latest_session_consumed_total;
+
/* Find triggers associated with this channel. */
cds_lfht_lookup(state->channel_triggers_ht,
hash_channel_key(&latest_sample.key),
&latest_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
- if (!node) {
+ if (caa_likely(!node)) {
goto end_unlock;
}
ret = evaluate_condition(condition, &evaluation, state,
previous_sample_available ? &previous_sample : NULL,
- &latest_sample, channel_info->capacity);
- if (ret) {
+ &latest_sample,
+ previous_session_consumed_total,
+ latest_session_consumed_total,
+ channel_info);
+ if (caa_unlikely(ret)) {
goto end_unlock;
}
- if (!evaluation) {
+ if (caa_likely(!evaluation)) {
continue;
}
channel_info->session_info->uid,
channel_info->session_info->gid);
lttng_evaluation_destroy(evaluation);
- if (ret) {
+ if (caa_unlikely(ret)) {
goto end_unlock;
}
}