+static struct lttng_trigger_notification *receive_notification(int pipe,
+ enum lttng_domain_type domain)
+{
+ int ret;
+ uint64_t id;
+ struct lttng_trigger_notification *notification = NULL;
+ char *capture_buffer = NULL;
+ size_t capture_buffer_size;
+ void *reception_buffer;
+ size_t reception_size;
+
+ struct lttng_ust_trigger_notification ust_notification;
+ struct lttng_kernel_trigger_notification kernel_notification;
+
+ /* Init lttng_trigger_notification */
+
+ switch(domain) {
+ case LTTNG_DOMAIN_UST:
+ reception_buffer = (void *) &ust_notification;
+ reception_size = sizeof(ust_notification);
+ break;
+ case LTTNG_DOMAIN_KERNEL:
+ reception_buffer = (void *) &kernel_notification;
+ reception_size = sizeof(kernel_notification);
+ break;
+ default:
+ assert(0);
+ }
+
+ /*
+ * The monitoring pipe only holds messages smaller than PIPE_BUF,
+ * ensuring that read/write of sampling messages are atomic.
+ */
+ /* TODO: should we read as much as we can ? EWOULDBLOCK? */
+
+ ret = lttng_read(pipe, reception_buffer, reception_size);
+ if (ret != reception_size) {
+ PERROR("Failed to read from event source pipe (fd = %i, size to read=%zu, ret=%d)",
+ pipe, reception_size, ret);
+ /* TODO: Should this error out completly.
+ * This can happen when an app is killed as of today
+ * ret = -1 cause the whole thread to die and fuck up
+ * everything.
+ */
+ goto end;
+ }
+
+ switch(domain) {
+ case LTTNG_DOMAIN_UST:
+ id = ust_notification.id;
+ capture_buffer_size =
+ ust_notification.capture_buf_size;
+ break;
+ case LTTNG_DOMAIN_KERNEL:
+ id = kernel_notification.id;
+ capture_buffer_size =
+ kernel_notification.capture_buf_size;
+ break;
+ default:
+ assert(0);
+ }
+
+ if (capture_buffer_size == 0) {
+ capture_buffer = NULL;
+ goto skip_capture;
+ }
+
+ capture_buffer = zmalloc(capture_buffer_size);
+ if (!capture_buffer) {
+ ERR("[notification-thread] Failed to allocate capture buffer");
+ goto end;
+ }
+
+ /*
+ * Fetch additional payload (capture).
+ */
+ ret = lttng_read(pipe, capture_buffer, capture_buffer_size);
+ if (ret != capture_buffer_size) {
+ ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
+ pipe);
+ /* TODO: Should this error out completly.
+ * This can happen when an app is killed as of today
+ * ret = -1 cause the whole thread to die and fuck up
+ * everything.
+ */
+ goto end;
+ }
+
+skip_capture:
+ notification = lttng_trigger_notification_create(
+ id, domain, capture_buffer, capture_buffer_size);
+ if (notification == NULL) {
+ goto end;
+ }
+
+ /* Ownership transfered to the lttng_trigger_notification object */
+ capture_buffer = NULL;
+
+end:
+ free(capture_buffer);
+ return notification;
+}
+
+int handle_notification_thread_event(struct notification_thread_state *state,
+ int pipe,
+ enum lttng_domain_type domain)
+{
+ int ret;
+ enum lttng_trigger_status trigger_status;
+ struct cds_lfht_node *node;
+ struct cds_lfht_iter iter;
+ struct notification_trigger_tokens_ht_element *element;
+ struct lttng_evaluation *evaluation = NULL;
+ struct lttng_trigger_notification *notification = NULL;
+ enum action_executor_status executor_status;
+ struct notification_client_list *client_list = NULL;
+ const char *trigger_name;
+ unsigned int capture_count = 0;
+
+ notification = receive_notification(pipe, domain);
+ if (notification == NULL) {
+ ERR("[notification-thread] Error receiving notification from tracer (fd = %i, domain = %s)",
+ pipe, lttng_domain_type_str(domain));
+ ret = -1;
+ goto end;
+ }
+
+ /* Find triggers associated with this token. */
+ rcu_read_lock();
+ cds_lfht_lookup(state->trigger_tokens_ht,
+ hash_key_u64(¬ification->id, lttng_ht_seed),
+ match_trigger_token, ¬ification->id, &iter);
+ node = cds_lfht_iter_get_node(&iter);
+ if (caa_unlikely(!node)) {
+ /* TODO: is this an error? This might happen if the receive side
+ * is slow to process event from source and that the trigger was
+ * removed but the app still kicking. This yield another
+ * question on the trigger lifetime and when we can remove a
+ * trigger. How to guarantee that all event with the token idea
+ * have be processed? Do we want to provide this guarantee?
+ *
+ * Update: I have encountered this when using a trigger on
+ * sched_switch and then removing it. The frequency is quite
+ * high hence we en up exactly in the mentionned scenario.
+ * AFAIK this might be the best way to handle this.
+ */
+ ret = 0;
+ goto end_unlock;
+ }
+ element = caa_container_of(node,
+ struct notification_trigger_tokens_ht_element,
+ node);
+
+ if (!lttng_trigger_should_fire(element->trigger)) {
+ ret = 0;
+ goto end_unlock;
+ }
+
+ lttng_trigger_fire(element->trigger);
+
+ trigger_status = lttng_trigger_get_name(element->trigger, &trigger_name);
+ assert(trigger_status == LTTNG_TRIGGER_STATUS_OK);
+
+ if (LTTNG_CONDITION_STATUS_OK !=
+ lttng_condition_event_rule_get_capture_descriptor_count(
+ lttng_trigger_get_const_condition(
+ element->trigger),
+ &capture_count)) {
+ ERR("Get capture count");
+ ret = -1;
+ goto end;
+ }
+
+ if (!notification->capture_buffer && capture_count != 0) {
+ ERR("Expected capture but capture buffer is null");
+ ret = -1;
+ goto end;
+ }
+
+ evaluation = lttng_evaluation_event_rule_create(
+ container_of(lttng_trigger_get_const_condition(
+ element->trigger),
+ struct lttng_condition_event_rule,
+ parent),
+ trigger_name,
+ notification->capture_buffer,
+ notification->capture_buf_size, false);
+
+ if (evaluation == NULL) {
+ ERR("[notification-thread] Failed to create event rule hit evaluation");
+ ret = -1;
+ goto end_unlock;
+ }
+ client_list = get_client_list_from_condition(state,
+ lttng_trigger_get_const_condition(element->trigger));
+ executor_status = action_executor_enqueue(state->executor,
+ element->trigger, evaluation, NULL, client_list);
+ switch (executor_status) {
+ case ACTION_EXECUTOR_STATUS_OK:
+ ret = 0;
+ break;
+ case ACTION_EXECUTOR_STATUS_OVERFLOW:
+ {
+ struct notification_client_list_element *client_list_element,
+ *tmp;
+
+ /*
+ * Not a fatal error; this is expected and simply means the
+ * executor has too much work queued already.
+ */
+ ret = 0;
+
+ if (!client_list) {
+ break;
+ }
+
+ /* Warn clients that a notification (or more) was dropped. */
+ pthread_mutex_lock(&client_list->lock);
+ cds_list_for_each_entry_safe(client_list_element, tmp,
+ &client_list->list, node) {
+ enum client_transmission_status transmission_status;
+ struct notification_client *client =
+ client_list_element->client;
+
+ pthread_mutex_lock(&client->lock);
+ ret = client_notification_overflow(client);
+ if (ret) {
+ /* Fatal error. */
+ goto next_client;
+ }
+
+ transmission_status =
+ client_flush_outgoing_queue(client);
+ ret = client_handle_transmission_status(
+ client, transmission_status, state);
+ if (ret) {
+ /* Fatal error. */
+ goto next_client;
+ }
+next_client:
+ pthread_mutex_unlock(&client->lock);
+ if (ret) {
+ break;
+ }
+ }
+ pthread_mutex_unlock(&client_list->lock);
+ break;
+ }
+ case ACTION_EXECUTOR_STATUS_INVALID:
+ case ACTION_EXECUTOR_STATUS_ERROR:
+ /* Fatal error, shut down everything. */
+ ERR("Fatal error encoutered while enqueuing action");
+ ret = -1;
+ goto end_unlock;
+ default:
+ /* Unhandled error. */
+ abort();
+ }
+
+end_unlock:
+ lttng_trigger_notification_destroy(notification);
+ notification_client_list_put(client_list);
+ rcu_read_unlock();
+end:
+ return ret;
+}
+