#include <urcu.h>
#include <urcu/rculfhash.h>
-#include "notification-thread.h"
-#include "notification-thread-events.h"
-#include "notification-thread-commands.h"
#include <common/defaults.h>
#include <common/error.h>
#include <common/futex.h>
#include <lttng/condition/condition-internal.h>
#include <lttng/condition/buffer-usage-internal.h>
#include <lttng/notification/channel-internal.h>
+
#include <time.h>
#include <unistd.h>
#include <assert.h>
#include <inttypes.h>
#include <fcntl.h>
+#include "notification-thread.h"
+#include "notification-thread-events.h"
+#include "notification-thread-commands.h"
+#include "lttng-sessiond.h"
+#include "kernel.h"
+
#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
uid_t uid;
gid_t gid;
/*
- * Indicates if the credentials and versions of the client has been
+ * Indicates if the credentials and versions of the client have been
* checked.
*/
bool validated;
uint64_t lowest_usage;
};
+static unsigned long hash_channel_key(struct channel_key *key);
+static int evaluate_condition(struct lttng_condition *condition,
+ struct lttng_evaluation **evaluation,
+ struct notification_thread_state *state,
+ struct channel_state_sample *previous_sample,
+ struct channel_state_sample *latest_sample,
+ uint64_t buffer_capacity);
+static
+int send_evaluation_to_clients(struct lttng_trigger *trigger,
+ struct lttng_evaluation *evaluation,
+ struct notification_client_list *client_list,
+ struct notification_thread_state *state,
+ uid_t channel_uid, gid_t channel_gid);
+
static
int match_client(struct cds_lfht_node *node, const void *key)
{
}
}
+static
+unsigned long hash_channel_key(struct channel_key *key)
+{
+ unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
+ unsigned long domain_hash = hash_key_ulong(
+ (void *) (unsigned long) key->domain, lttng_ht_seed);
+
+ return key_hash ^ domain_hash;
+}
+
static
void channel_info_destroy(struct channel_info *channel_info)
{
return NULL;
}
+/* This function must be called with the RCU read lock held. */
+static
+int evaluate_condition_for_client(struct lttng_trigger *trigger,
+ struct lttng_condition *condition,
+ struct notification_client *client,
+ struct notification_thread_state *state)
+{
+ int ret;
+ struct cds_lfht_iter iter;
+ struct cds_lfht_node *node;
+ struct channel_info *channel_info = NULL;
+ struct channel_key *channel_key = NULL;
+ struct channel_state_sample *last_sample = NULL;
+ struct lttng_channel_trigger_list *channel_trigger_list = NULL;
+ struct lttng_evaluation *evaluation = NULL;
+ struct notification_client_list client_list = { 0 };
+ struct notification_client_list_element client_list_element = { 0 };
+
+ assert(trigger);
+ assert(condition);
+ assert(client);
+ assert(state);
+
+ /* Find the channel associated with the trigger. */
+ cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
+ channel_trigger_list , channel_triggers_ht_node) {
+ struct lttng_trigger_list_element *element;
+
+ cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
+ struct lttng_condition *current_condition =
+ lttng_trigger_get_condition(
+ element->trigger);
+
+ assert(current_condition);
+ if (!lttng_condition_is_equal(condition,
+ current_condition)) {
+ continue;
+ }
+
+ /* Found the trigger, save the channel key. */
+ channel_key = &channel_trigger_list->channel_key;
+ break;
+ }
+ if (channel_key) {
+ /* The channel key was found stop iteration. */
+ break;
+ }
+ }
+
+ if (!channel_key){
+ /* No channel found; normal exit. */
+ DBG("[notification-thread] No channel associated with newly subscribed-to condition");
+ ret = 0;
+ goto end;
+ }
+
+ /* Fetch channel info for the matching channel. */
+ cds_lfht_lookup(state->channels_ht,
+ hash_channel_key(channel_key),
+ match_channel_info,
+ channel_key,
+ &iter);
+ node = cds_lfht_iter_get_node(&iter);
+ assert(node);
+ channel_info = caa_container_of(node, struct channel_info,
+ channels_ht_node);
+
+ /* Retrieve the channel's last sample, if it exists. */
+ cds_lfht_lookup(state->channel_state_ht,
+ hash_channel_key(channel_key),
+ match_channel_state_sample,
+ channel_key,
+ &iter);
+ node = cds_lfht_iter_get_node(&iter);
+ if (node) {
+ last_sample = caa_container_of(node,
+ struct channel_state_sample,
+ channel_state_ht_node);
+ } else {
+ /* Nothing to evaluate, no sample was ever taken. Normal exit */
+ DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
+ ret = 0;
+ goto end;
+ }
+
+ ret = evaluate_condition(condition, &evaluation, state, NULL,
+ last_sample, channel_info->capacity);
+ if (ret) {
+ WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
+ goto end;
+ }
+
+ if (!evaluation) {
+ /* Evaluation yielded nothing. Normal exit. */
+ DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
+ ret = 0;
+ goto end;
+ }
+
+ /*
+ * Create a temporary client list with the client currently
+ * subscribing.
+ */
+ cds_lfht_node_init(&client_list.notification_trigger_ht_node);
+ CDS_INIT_LIST_HEAD(&client_list.list);
+ client_list.trigger = trigger;
+
+ CDS_INIT_LIST_HEAD(&client_list_element.node);
+ client_list_element.client = client;
+ cds_list_add(&client_list_element.node, &client_list.list);
+
+ /* Send evaluation result to the newly-subscribed client. */
+ DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
+ ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
+ state, channel_info->uid, channel_info->gid);
+
+end:
+ return ret;
+}
+
static
int notification_thread_client_subscribe(struct notification_client *client,
struct lttng_condition *condition,
condition_list_element->condition = condition;
cds_list_add(&condition_list_element->node, &client->condition_list);
- /*
- * Add the client to the list of clients interested in a given trigger
- * if a "notification" trigger with a corresponding condition was
- * added prior.
- */
cds_lfht_lookup(state->notification_trigger_clients_ht,
lttng_condition_hash(condition),
match_client_list_condition,
&iter);
node = cds_lfht_iter_get_node(&iter);
if (!node) {
+ /*
+ * No notification-emiting trigger registered with this
+ * condition. We don't evaluate the condition right away
+ * since this trigger is not registered yet.
+ */
free(client_list_element);
goto end_unlock;
}
client_list = caa_container_of(node, struct notification_client_list,
notification_trigger_ht_node);
+ /*
+ * The condition to which the client just subscribed is evaluated
+ * at this point so that conditions that are already TRUE result
+ * in a notification being sent out.
+ */
+ if (evaluate_condition_for_client(client_list->trigger, condition,
+ client, state)) {
+ WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
+ ret = -1;
+ goto end_unlock;
+ }
+
+ /*
+ * Add the client to the list of clients interested in a given trigger
+ * if a "notification" trigger with a corresponding condition was
+ * added prior.
+ */
client_list_element->client = client;
CDS_INIT_LIST_HEAD(&client_list_element->node);
cds_list_add(&client_list_element->node, &client_list->list);
return applies;
}
-static
-unsigned long hash_channel_key(struct channel_key *key)
-{
- return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong(
- (void *) (unsigned long) key->domain, lttng_ht_seed);
-}
-
static
int handle_notification_thread_command_add_channel(
struct notification_thread_state *state,
return 0;
}
+static
+int condition_is_supported(struct lttng_condition *condition)
+{
+ int ret;
+
+ switch (lttng_condition_get_type(condition)) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+ {
+ enum lttng_domain_type domain;
+
+ ret = lttng_condition_buffer_usage_get_domain_type(condition,
+ &domain);
+ if (ret) {
+ ret = -1;
+ goto end;
+ }
+
+ if (domain != LTTNG_DOMAIN_KERNEL) {
+ ret = 1;
+ goto end;
+ }
+
+ /*
+ * Older kernel tracers don't expose the API to monitor their
+ * buffers. Therefore, we reject triggers that require that
+ * mechanism to be available to be evaluated.
+ */
+ ret = kernel_supports_ring_buffer_snapshot_sample_positions(
+ kernel_tracer_fd);
+ break;
+ }
+ default:
+ ret = 1;
+ }
+end:
+ return ret;
+}
+
/*
* FIXME A client's credentials are not checked when registering a trigger, nor
* are they stored alongside with the trigger.
*
- * The effects of this are benign:
+ * The effects of this are benign since:
* - The client will succeed in registering the trigger, as it is valid,
* - The trigger will, internally, be bound to the channel,
* - The notifications will not be sent since the client's credentials
* are checked against the channel at that moment.
+ *
+ * If this function returns a non-zero value, it means something is
+ * fundamentally broken and the whole subsystem/thread will be torn down.
+ *
+ * If a non-fatal error occurs, just set the cmd_result to the appropriate
+ * error code.
*/
static
int handle_notification_thread_command_register_trigger(
rcu_read_lock();
condition = lttng_trigger_get_condition(trigger);
+ assert(condition);
+
+ ret = condition_is_supported(condition);
+ if (ret < 0) {
+ goto error;
+ } else if (ret == 0) {
+ *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
+ goto error;
+ } else {
+ /* Feature is supported, continue. */
+ ret = 0;
+ }
+
trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
if (!trigger_ht_element) {
ret = -1;
cds_lfht_add(state->notification_trigger_clients_ht,
lttng_condition_hash(condition),
&client_list->notification_trigger_ht_node);
- /*
- * Client list ownership transferred to the
- * notification_trigger_clients_ht.
- */
- client_list = NULL;
/*
* Add the trigger to list of triggers bound to the channels currently
&iter);
node = cds_lfht_iter_get_node(&iter);
assert(node);
- /* Free the list of triggers associated with this channel. */
trigger_list = caa_container_of(node,
struct lttng_channel_trigger_list,
channel_triggers_ht_node);
CDS_INIT_LIST_HEAD(&trigger_list_element->node);
trigger_list_element->trigger = trigger;
cds_list_add(&trigger_list_element->node, &trigger_list->list);
+
/* A trigger can only apply to one channel. */
break;
}
+ /*
+ * Since there is nothing preventing clients from subscribing to a
+ * condition before the corresponding trigger is registered, we have
+ * to evaluate this new condition right away.
+ *
+ * At some point, we were waiting for the next "evaluation" (e.g. on
+ * reception of a channel sample) to evaluate this new condition, but
+ * that was broken.
+ *
+ * The reason it was broken is that waiting for the next sample
+ * does not allow us to properly handle transitions for edge-triggered
+ * conditions.
+ *
+ * Consider this example: when we handle a new channel sample, we
+ * evaluate each conditions twice: once with the previous state, and
+ * again with the newest state. We then use those two results to
+ * determine whether a state change happened: a condition was false and
+ * became true. If a state change happened, we have to notify clients.
+ *
+ * Now, if a client subscribes to a given notification and registers
+ * a trigger *after* that subscription, we have to make sure the
+ * condition is evaluated at this point while considering only the
+ * current state. Otherwise, the next evaluation cycle may only see
+ * that the evaluations remain the same (true for samples n-1 and n) and
+ * the client will never know that the condition has been met.
+ */
+ cds_list_for_each_entry_safe(client_list_element, tmp,
+ &client_list->list, node) {
+ ret = evaluate_condition_for_client(trigger, condition,
+ client_list_element->client, state);
+ if (ret) {
+ goto error_free_client_list;
+ }
+ }
+
+ /*
+ * Client list ownership transferred to the
+ * notification_trigger_clients_ht.
+ */
+ client_list = NULL;
+
*cmd_result = LTTNG_OK;
error_free_client_list:
if (client_list) {
DBG("[notification-thread] Removed trigger from channel_triggers_ht");
cds_list_del(&trigger_element->node);
+ /* A trigger can only appear once per channel */
+ break;
}
}
struct notification_thread_command *cmd;
/* Read event_fd to put it back into a quiescent state. */
- ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+ ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, sizeof(counter));
if (ret == -1) {
goto error;
}
int handle_notification_thread_trigger_unregister_all(
struct notification_thread_state *state)
{
- bool error_occured = false;
+ bool error_occurred = false;
struct cds_lfht_iter iter;
struct lttng_trigger_ht_element *trigger_ht_element;
int ret = handle_notification_thread_command_unregister_trigger(
state, trigger_ht_element->trigger, NULL);
if (ret) {
- error_occured = true;
+ error_occurred = true;
}
}
- return error_occured ? -1 : 0;
+ return error_occurred ? -1 : 0;
}
static
if (client->communication.inbound.msg_type ==
LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
- /*
- * FIXME The current state should be evaluated on
- * subscription.
- */
ret = notification_thread_client_subscribe(client,
condition, state, &status);
} else {