+int perform_event_action_notify(struct notification_thread_state *state,
+ const struct lttng_trigger *trigger,
+ const struct lttng_trigger_notification *notification,
+ const struct lttng_action *action)
+{
+ int ret;
+ struct notification_client_list *client_list;
+ struct lttng_evaluation *evaluation = NULL;
+ const struct lttng_credentials *creds = lttng_trigger_get_credentials(trigger);
+
+ /*
+ * Check if any client is subscribed to the result of this
+ * evaluation.
+ */
+ client_list = get_client_list_from_condition(state, trigger->condition);
+ assert(client_list);
+ if (cds_list_empty(&client_list->list)) {
+ ret = 0;
+ goto end;
+ }
+
+ evaluation = lttng_evaluation_event_rule_create(trigger->name);
+ if (!evaluation) {
+ ERR("Failed to create event rule hit evaluation");
+ ret = -1;
+ goto end;
+ }
+
+ /* Dispatch evaluation result to all clients.
+ * Note that here the passed credentials are the one from the trigger,
+ * this is because there is no internal object binded to the trigger per
+ * see and the credential validation is done at the registration level
+ * for the event rule based trigger. For a channel the credential
+ * validation can only be done on notify since the trigger can be
+ * registered before the channel/session creation.
+ */
+ ret = send_evaluation_to_clients(trigger,
+ evaluation, client_list, state,
+ creds->uid,
+ creds->gid);
+ lttng_evaluation_destroy(evaluation);
+ if (caa_unlikely(ret)) {
+ goto end;
+ }
+end:
+ return ret;
+
+}
+
+/* This can be called recursively, pass NULL for action on the first iteration */
+int perform_event_action(struct notification_thread_state *state,
+ const struct lttng_trigger *trigger,
+ const struct lttng_trigger_notification *notification,
+ const struct lttng_action *action)
+{
+ int ret = 0;
+ enum lttng_action_type action_type;
+
+ assert(trigger);
+
+ if (!action) {
+ action = lttng_trigger_get_const_action(trigger);
+ }
+
+ action_type = lttng_action_get_type_const(action);
+ DBG("Handling action %s for trigger id %s (%" PRIu64 ")",
+ lttng_action_type_string(action_type), trigger->name,
+ trigger->key.value);
+
+ switch (action_type) {
+ case LTTNG_ACTION_TYPE_GROUP:
+ {
+ /* Recurse into the group */
+ const struct lttng_action *tmp = NULL;
+ unsigned int count = 0;
+ (void) lttng_action_group_get_count(action, &count);
+ for (int i = 0; i < count; i++) {
+ tmp = lttng_action_group_get_at_index_const(action, i);
+ assert(tmp);
+ ret = perform_event_action(state, trigger, notification, tmp);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+ break;
+ }
+ case LTTNG_ACTION_TYPE_NOTIFY:
+ {
+ ret = perform_event_action_notify(state, trigger, notification, action);
+ break;
+ }
+ default:
+ break;
+ }
+end:
+ return ret;
+}
+
+int handle_notification_thread_event(struct notification_thread_state *state,
+ int pipe,
+ enum lttng_domain_type domain)
+{
+ int ret;
+ struct lttng_ust_trigger_notification ust_notification;
+ uint64_t kernel_notification;
+ struct cds_lfht_node *node;
+ struct cds_lfht_iter iter;
+ struct notification_trigger_tokens_ht_element *element;
+ struct lttng_trigger_notification notification;
+ void *reception_buffer;
+ size_t reception_size;
+ enum action_executor_status executor_status;
+ struct notification_client_list *client_list = NULL;
+
+ notification.type = domain;
+
+ switch(domain) {
+ case LTTNG_DOMAIN_UST:
+ reception_buffer = (void *) &ust_notification;
+ reception_size = sizeof(ust_notification);
+ notification.u.ust = &ust_notification;
+ break;
+ case LTTNG_DOMAIN_KERNEL:
+ reception_buffer = (void *) &kernel_notification;
+ reception_size = sizeof(kernel_notification);
+ notification.u.kernel = &kernel_notification;
+ break;
+ default:
+ assert(0);
+ }
+
+ /*
+ * The monitoring pipe only holds messages smaller than PIPE_BUF,
+ * ensuring that read/write of sampling messages are atomic.
+ */
+ /* TODO: should we read as much as we can ? EWOULDBLOCK? */
+
+ ret = lttng_read(pipe, reception_buffer, reception_size);
+ if (ret != reception_size) {
+ ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
+ pipe);
+ /* TODO: Should this error out completly.
+ * This can happen when an app is killed as of today
+ * ret = -1 cause the whole thread to die and fuck up
+ * everything.
+ */
+ goto end;
+ }
+
+ switch(domain) {
+ case LTTNG_DOMAIN_UST:
+ notification.id = ust_notification.id;
+ break;
+ case LTTNG_DOMAIN_KERNEL:
+ notification.id = kernel_notification;
+ break;
+ default:
+ assert(0);
+ }
+
+ /* Find triggers associated with this token. */
+ rcu_read_lock();
+ cds_lfht_lookup(state->trigger_tokens_ht,
+ hash_key_u64(¬ification.id, lttng_ht_seed), match_trigger_token,
+ ¬ification.id, &iter);
+ node = cds_lfht_iter_get_node(&iter);
+ if (caa_unlikely(!node)) {
+ /* TODO: is this an error? This might happen if the receive side
+ * is slow to process event from source and that the trigger was
+ * removed but the app still kicking. This yield another
+ * question on the trigger lifetime and when we can remove a
+ * trigger. How to guarantee that all event with the token idea
+ * have be processed? Do we want to provide this guarantee?
+ *
+ * Update: I have encountered this when using a trigger on
+ * sched_switch and then removing it. The frequency is quite
+ * high hence we en up exactly in the mentionned scenario.
+ * AFAIK this might be the best way to handle this.
+ */
+ ret = 0;
+ goto end_unlock;
+ }
+ element = caa_container_of(node,
+ struct notification_trigger_tokens_ht_element,
+ node);
+
+ if (!lttng_trigger_is_ready_to_fire(element->trigger)) {
+ ret = 0;
+ goto end_unlock;
+ }
+
+ client_list = get_client_list_from_condition(state,
+ lttng_trigger_get_const_condition(element->trigger));
+ executor_status = action_executor_enqueue(
+ state->executor, element->trigger, client_list);
+ switch (executor_status) {
+ case ACTION_EXECUTOR_STATUS_OK:
+ ret = 0;
+ break;
+ case ACTION_EXECUTOR_STATUS_OVERFLOW:
+ {
+ struct notification_client_list_element *client_list_element,
+ *tmp;
+
+ /*
+ * Not a fatal error; this is expected and simply means the
+ * executor has too much work queued already.
+ */
+ ret = 0;
+
+ if (!client_list) {
+ break;
+ }
+
+ /* Warn clients that a notification (or more) was dropped. */
+ pthread_mutex_lock(&client_list->lock);
+ cds_list_for_each_entry_safe(client_list_element, tmp,
+ &client_list->list, node) {
+ enum client_transmission_status transmission_status;
+ struct notification_client *client =
+ client_list_element->client;
+
+ pthread_mutex_lock(&client->lock);
+ ret = client_notification_overflow(client);
+ if (ret) {
+ /* Fatal error. */
+ goto next_client;
+ }
+
+ transmission_status =
+ client_flush_outgoing_queue(client);
+ ret = client_handle_transmission_status(
+ client, transmission_status, state);
+ if (ret) {
+ /* Fatal error. */
+ goto next_client;
+ }
+next_client:
+ pthread_mutex_unlock(&client->lock);
+ if (ret) {
+ break;
+ }
+ }
+ pthread_mutex_lock(&client_list->lock);
+ break;
+ }
+ case ACTION_EXECUTOR_STATUS_ERROR:
+ /* Fatal error, shut down everything. */
+ ERR("Fatal error encoutered while enqueuing action");
+ ret = -1;
+ goto end_unlock;
+ default:
+ /* Unhandled error. */
+ abort();
+ }
+
+end_unlock:
+ notification_client_list_put(client_list);
+ rcu_read_unlock();
+end:
+ return ret;
+}
+