sessiond: trigger: run trigger actions through an action executor
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 11 Feb 2020 04:29:18 +0000 (23:29 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 13 Aug 2020 22:17:48 +0000 (18:17 -0400)
The `action executor` interface allows the notification subsystem to enqueue
work items to execute on behalf of a given trigger. This allows the notification
thread to remain responsive even if the actions to execute are blocking (as
through the use of network communication).

Before this commit, the notification subsystem only handled `notify` actions;
handling code for new action types are added as part of the action executor.

The existing `notify` action is now performed through the action executor so
that all actions can be managed in the same way.

This is less efficient than sending the notifications directly, but could be
optimized trivially (if it ever becomes a problem) when:
  - the action is a group containing only a `notify` action,
  - the action is a `notify` action.

Managing the new action types requires fairly localized changes to the existing
notification subsystem code. The main code paths that are modified are the sites
where `evaluation` objects are created:
  - on an object state change (session or channel state changes, see
    handle_notification_thread_channel_sample and
    handle_notification_thread_command_session_rotation),
  - on registration of a trigger (see
    handle_notification_thread_command_register_trigger),
  - on subscription to a condition (see client_handle_message_subscription).

To understand the lifetime of most objects involved in a work deferral to the
action executor, see the paragraph in notification-thread-internal.h (line 82)
to understand the relation between clients and client lists.

Overview of changes
===

Object state changes

Change-Id: I23290e94d98e781992661f0aee88de9986ed274f
---

As hinted in the notification_client_list documentation, defering work on a
state change is straight-forward: a reference is taken on a client list and the
list is provided to the action executor as part of a work item.

Hence, very little changes are made to the the two state-change handling sites
beyond enqueuing a work item rather than directly sending a notification.

Subscription to a condition
---

A notification client can subscribe to a condition before or after a matching
trigger (same condition and containing a notify action) has been registered.

When a client subscribes to a condition, it is a added to a corresponding
"client list"

Registration of a trigger
---

When a client subscribes to a condition, the current state of
that condition is immediately evaluated. If the condition is true
(for instance, a channel's buffer are filled beyond X% of their
capacity),

TODO:

Change-Id: I7f9bc197715c9ca008a4f1fcd4c86e01b6252dce
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
13 files changed:
src/bin/lttng-sessiond/Makefile.am
src/bin/lttng-sessiond/action-executor.c [new file with mode: 0644]
src/bin/lttng-sessiond/action-executor.h [new file with mode: 0644]
src/bin/lttng-sessiond/health-sessiond.h
src/bin/lttng-sessiond/notification-thread-commands.c
src/bin/lttng-sessiond/notification-thread-commands.h
src/bin/lttng-sessiond/notification-thread-events.c
src/bin/lttng-sessiond/notification-thread-internal.h
src/bin/lttng-sessiond/notification-thread.c
src/bin/lttng-sessiond/notification-thread.h
src/bin/lttng-sessiond/thread.c
src/lib/lttng-ctl/lttng-ctl-health.c
tests/unit/Makefile.am

index ee53655be18f8390934c01269c537a9a339d0117..dd807125b6342e243c4720f835ba01ba94c37f27 100644 (file)
@@ -54,7 +54,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \
                        manage-kernel.c manage-kernel.h \
                        manage-consumer.c manage-consumer.h \
                        clear.c clear.h \
-                       tracker.c tracker.h
+                       tracker.c tracker.h \
+                       action-executor.c action-executor.h
 
 if HAVE_LIBLTTNG_UST_CTL
 lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \
diff --git a/src/bin/lttng-sessiond/action-executor.c b/src/bin/lttng-sessiond/action-executor.c
new file mode 100644 (file)
index 0000000..40ca2bd
--- /dev/null
@@ -0,0 +1,674 @@
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#include "action-executor.h"
+#include "cmd.h"
+#include "health-sessiond.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-internal.h"
+#include "session.h"
+#include "thread.h"
+#include <common/macros.h>
+#include <common/optional.h>
+#include <lttng/action/action-internal.h>
+#include <lttng/action/group.h>
+#include <lttng/action/notify.h>
+#include <lttng/action/rotate-session.h>
+#include <lttng/action/snapshot-session.h>
+#include <lttng/action/start-session.h>
+#include <lttng/action/stop-session.h>
+#include <lttng/condition/evaluation.h>
+#include <lttng/lttng-error.h>
+#include <lttng/trigger/trigger-internal.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <urcu/list.h>
+
+#define THREAD_NAME "Action Executor"
+#define MAX_QUEUED_WORK_COUNT 8192
+
+struct action_work_item {
+       uint64_t id;
+       struct lttng_trigger *trigger;
+       struct lttng_evaluation *evaluation;
+       struct notification_client_list *client_list;
+       LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
+       struct cds_list_head list_node;
+};
+
+struct action_executor {
+       struct lttng_thread *thread;
+       struct notification_thread_handle *notification_thread_handle;
+       struct {
+               uint64_t pending_count;
+               struct cds_list_head list;
+               pthread_cond_t cond;
+               pthread_mutex_t lock;
+       } work;
+       bool should_quit;
+       uint64_t next_work_item_id;
+};
+
+typedef int (*action_executor_handler)(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *action);
+
+static int action_executor_notify_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_start_session_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_stop_session_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_rotate_session_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_snapshot_session_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_group_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_generic_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+
+static const action_executor_handler action_executors[] = {
+       [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
+       [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler,
+       [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler,
+       [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler,
+       [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler,
+       [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
+};
+
+static const char *action_type_names[] = {
+       [LTTNG_ACTION_TYPE_NOTIFY] = "Notify",
+       [LTTNG_ACTION_TYPE_START_SESSION] = "Start session",
+       [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session",
+       [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session",
+       [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session",
+       [LTTNG_ACTION_TYPE_GROUP] = "Group",
+};
+
+static const char *get_action_name(const struct lttng_action *action)
+{
+       return action_type_names[lttng_action_get_type_const(action)];
+}
+
+static int client_handle_transmission_status(
+               struct notification_client *client,
+               enum client_transmission_status status,
+               void *user_data)
+{
+       int ret = 0;
+       struct action_executor *executor = user_data;
+       bool update_communication = true;
+
+       ASSERT_LOCKED(client->lock);
+
+       switch (status) {
+       case CLIENT_TRANSMISSION_STATUS_COMPLETE:
+               DBG("Successfully sent full notification to client, client_id = %" PRIu64,
+                               client->id);
+               update_communication = false;
+               break;
+       case CLIENT_TRANSMISSION_STATUS_QUEUED:
+               DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
+                               client->id);
+               break;
+       case CLIENT_TRANSMISSION_STATUS_FAIL:
+               DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
+                               client->id);
+               client->communication.active = false;
+               break;
+       default:
+               ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
+                               client->id);
+               client->communication.active = false;
+               ret = -1;
+               goto end;
+       }
+
+       if (!update_communication) {
+               goto end;
+       }
+
+       ret = notification_thread_client_communication_update(
+                       executor->notification_thread_handle, client->id,
+                       status);
+end:
+       return ret;
+}
+
+static int action_executor_notify_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       return notification_client_list_send_evaluation(work_item->client_list,
+                       lttng_trigger_get_const_condition(work_item->trigger),
+                       work_item->evaluation,
+                       lttng_trigger_get_credentials(work_item->trigger),
+                       LTTNG_OPTIONAL_GET_PTR(work_item->object_creds),
+                       client_handle_transmission_status,
+                       executor);
+}
+
+static int action_executor_start_session_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       const char *session_name;
+       enum lttng_action_status action_status;
+       struct ltt_session *session;
+
+       action_status = lttng_action_start_session_get_session_name(
+                       action, &session_name);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               ERR("Failed to get session name from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       session_lock_list();
+       session = session_find_by_name(session_name);
+       if (session) {
+               enum lttng_error_code cmd_ret;
+
+               session_lock(session);
+               cmd_ret = cmd_start_trace(session);
+               session_unlock(session);
+
+               switch (cmd_ret) {
+               case LTTNG_OK:
+                       DBG("Successfully started session \"%s\" on behalf of trigger \"%p\"",
+                                       session_name,
+                                       work_item->trigger);
+                       break;
+               case LTTNG_ERR_TRACE_ALREADY_STARTED:
+                       DBG("Attempted to start session \"%s\" on behalf of trigger \"%p\" but it was already started",
+                                       session_name,
+                                       work_item->trigger);
+                       break;
+               default:
+                       WARN("Failed to start session \"%s\" on behalf of trigger \"%p\": %s",
+                                       session_name,
+                                       work_item->trigger,
+                                       lttng_strerror(-cmd_ret));
+                       break;
+               }
+               session_put(session);
+       } else {
+               DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%p\"",
+                               session_name, get_action_name(action),
+                               work_item->trigger);
+       }
+       session_unlock_list();
+end:
+       return ret;
+}
+
+static int action_executor_stop_session_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       const char *session_name;
+       enum lttng_action_status action_status;
+       struct ltt_session *session;
+
+       action_status = lttng_action_stop_session_get_session_name(
+                       action, &session_name);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               ERR("Failed to get session name from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       session_lock_list();
+       session = session_find_by_name(session_name);
+       if (session) {
+               enum lttng_error_code cmd_ret;
+
+               session_lock(session);
+               cmd_ret = cmd_stop_trace(session);
+               session_unlock(session);
+
+               switch (cmd_ret) {
+               case LTTNG_OK:
+                       DBG("Successfully stopped session \"%s\" on behalf of trigger \"%p\"",
+                                       session_name,
+                                       work_item->trigger);
+                       break;
+               case LTTNG_ERR_TRACE_ALREADY_STOPPED:
+                       DBG("Attempted to stop session \"%s\" on behalf of trigger \"%p\" but it was already stopped",
+                                       session_name,
+                                       work_item->trigger);
+                       break;
+               default:
+                       WARN("Failed to stop session \"%s\" on behalf of trigger \"%p\": %s",
+                                       session_name,
+                                       work_item->trigger,
+                                       lttng_strerror(-cmd_ret));
+                       break;
+               }
+               session_put(session);
+       } else {
+               DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%p\"",
+                               session_name, get_action_name(action),
+                               work_item->trigger);
+       }
+       session_unlock_list();
+end:
+       return ret;
+}
+
+static int action_executor_rotate_session_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       const char *session_name;
+       enum lttng_action_status action_status;
+       struct ltt_session *session;
+
+       action_status = lttng_action_rotate_session_get_session_name(
+                       action, &session_name);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               ERR("Failed to get session name from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       session_lock_list();
+       session = session_find_by_name(session_name);
+       if (session) {
+               enum lttng_error_code cmd_ret;
+
+               session_lock(session);
+               cmd_ret = cmd_rotate_session(session, NULL, false,
+                               LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+               session_unlock(session);
+
+               switch (cmd_ret) {
+               case LTTNG_OK:
+                       DBG("Successfully started rotation of session \"%s\" on behalf of trigger \"%p\"",
+                                       session_name,
+                                       work_item->trigger);
+                       break;
+               case LTTNG_ERR_ROTATION_PENDING:
+                       DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%p\" but a rotation is already ongoing",
+                                       session_name,
+                                       work_item->trigger);
+                       break;
+               case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
+               case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
+                       DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%p\" but a rotation has already been completed since the last stop or clear",
+                                       session_name,
+                                       work_item->trigger);
+                       break;
+               default:
+                       WARN("Failed to start a rotation of session \"%s\" on behalf of trigger \"%p\": %s",
+                                       session_name,
+                                       work_item->trigger,
+                                       lttng_strerror(-cmd_ret));
+                       break;
+               }
+               session_put(session);
+       } else {
+               DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%p\"",
+                               session_name, get_action_name(action),
+                               work_item->trigger);
+       }
+       session_unlock_list();
+end:
+       return ret;
+}
+
+static int action_executor_snapshot_session_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       const char *session_name;
+       enum lttng_action_status action_status;
+       struct ltt_session *session;
+       const struct lttng_snapshot_output default_snapshot_output = {
+               .max_size = UINT64_MAX,
+       };
+       const struct lttng_snapshot_output *snapshot_output =
+                       &default_snapshot_output;
+
+       action_status = lttng_action_snapshot_session_get_session_name(
+                       action, &session_name);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               ERR("Failed to get session name from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       action_status = lttng_action_snapshot_session_get_output(
+                       action, &snapshot_output);
+       if (action_status != LTTNG_ACTION_STATUS_OK &&
+                       action_status != LTTNG_ACTION_STATUS_UNSET) {
+               ERR("Failed to get output from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       session_lock_list();
+       session = session_find_by_name(session_name);
+       if (session) {
+               enum lttng_error_code cmd_ret;
+
+               session_lock(session);
+               cmd_ret = cmd_snapshot_record(session, snapshot_output, 0);
+               session_unlock(session);
+
+               switch (cmd_ret) {
+               case LTTNG_OK:
+                       DBG("Successfully recorded snapshot of session \"%s\" on behalf of trigger \"%p\"",
+                                       session_name,
+                                       work_item->trigger);
+                       break;
+               default:
+                       WARN("Failed to record snapshot of session \"%s\" on behalf of trigger \"%p\": %s",
+                                       session_name,
+                                       work_item->trigger,
+                                       lttng_strerror(-cmd_ret));
+                       break;
+               }
+               session_put(session);
+       } else {
+               DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%p\"",
+                               session_name, get_action_name(action),
+                               work_item->trigger);
+       }
+       session_unlock_list();
+end:
+       return ret;
+}
+
+static int action_executor_group_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action_group)
+{
+       int ret = 0;
+       unsigned int i, count;
+       enum lttng_action_status action_status;
+
+       action_status = lttng_action_group_get_count(action_group, &count);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               /* Fatal error. */
+               ERR("Failed to get count of action in action group");
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
+       for (i = 0; i < count; i++) {
+               const struct lttng_action *action =
+                               lttng_action_group_get_at_index(
+                                               action_group, i);
+
+               ret = action_executor_generic_handler(
+                               executor, work_item, action);
+               if (ret) {
+                       ERR("Stopping the execution of the action group of trigger \"%p\" following a fatal error",
+                                       work_item->trigger);
+                       goto end;
+               }
+       }
+end:
+       return ret;
+}
+
+static int action_executor_generic_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       DBG("Executing action \"%s\" of trigger \"%p\" action work item %" PRIu64,
+                       get_action_name(action),
+                       work_item->trigger,
+                       work_item->id);
+
+       return action_executors[lttng_action_get_type_const(action)](
+                       executor, work_item, action);
+}
+
+static int action_work_item_execute(struct action_executor *executor,
+               struct action_work_item *work_item)
+{
+       int ret;
+       const struct lttng_action *action =
+                       lttng_trigger_get_const_action(work_item->trigger);
+
+       DBG("Starting execution of action work item %" PRIu64 " of trigger \"%p\"",
+                       work_item->id, work_item->trigger);
+       ret = action_executor_generic_handler(executor, work_item, action);
+       DBG("Completed execution of action work item %" PRIu64 " of trigger \"%p\"",
+                       work_item->id, work_item->trigger);
+       return ret;
+}
+
+static void action_work_item_destroy(struct action_work_item *work_item)
+{
+       lttng_trigger_put(work_item->trigger);
+       lttng_evaluation_destroy(work_item->evaluation);
+       notification_client_list_put(work_item->client_list);
+       free(work_item);
+}
+
+static void *action_executor_thread(void *_data)
+{
+       struct action_executor *executor = _data;
+
+       assert(executor);
+
+       health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
+
+       rcu_register_thread();
+       rcu_thread_online();
+
+       DBG("Entering work execution loop");
+       pthread_mutex_lock(&executor->work.lock);
+       while (!executor->should_quit) {
+               int ret;
+               struct action_work_item *work_item;
+
+               health_code_update();
+               if (executor->work.pending_count == 0) {
+                       health_poll_entry();
+                       DBG("No work items enqueued, entering wait");
+                       pthread_cond_wait(&executor->work.cond,
+                                       &executor->work.lock);
+                       DBG("Woke-up from wait");
+                       health_poll_exit();
+                       continue;
+               }
+
+               /* Pop item from front of the listwith work lock held. */
+               work_item = cds_list_first_entry(&executor->work.list,
+                               struct action_work_item, list_node);
+               cds_list_del(&work_item->list_node);
+               executor->work.pending_count--;
+
+               /*
+                * Work can be performed without holding the work lock,
+                * allowing new items to be queued.
+                */
+               pthread_mutex_unlock(&executor->work.lock);
+               ret = action_work_item_execute(executor, work_item);
+               action_work_item_destroy(work_item);
+               if (ret) {
+                       /* Fatal error. */
+                       break;
+               }
+
+               health_code_update();
+               pthread_mutex_lock(&executor->work.lock);
+       }
+
+       pthread_mutex_unlock(&executor->work.lock);
+       DBG("Left work execution loop");
+
+       health_code_update();
+
+       rcu_thread_offline();
+       rcu_unregister_thread();
+       health_unregister(health_sessiond);
+
+       return NULL;
+}
+
+static bool shutdown_action_executor_thread(void *_data)
+{
+       struct action_executor *executor = _data;
+
+       executor->should_quit = true;
+       pthread_cond_signal(&executor->work.cond);
+       return true;
+}
+
+static void clean_up_action_executor_thread(void *_data)
+{
+       struct action_executor *executor = _data;
+
+       assert(cds_list_empty(&executor->work.list));
+
+       pthread_mutex_destroy(&executor->work.lock);
+       pthread_cond_destroy(&executor->work.cond);
+       free(executor);
+}
+
+struct action_executor *action_executor_create(
+               struct notification_thread_handle *handle)
+{
+       struct action_executor *executor = zmalloc(sizeof(*executor));
+
+       if (!executor) {
+               goto end;
+       }
+
+       CDS_INIT_LIST_HEAD(&executor->work.list);
+       pthread_cond_init(&executor->work.cond, NULL);
+       pthread_mutex_init(&executor->work.lock, NULL);
+       executor->notification_thread_handle = handle;
+
+       executor->thread = lttng_thread_create(THREAD_NAME,
+                       action_executor_thread, shutdown_action_executor_thread,
+                       clean_up_action_executor_thread, executor);
+end:
+       return executor;
+}
+
+void action_executor_destroy(struct action_executor *executor)
+{
+       struct action_work_item *work_item, *tmp;
+
+       /* TODO Wait for work list to drain? */
+       lttng_thread_shutdown(executor->thread);
+       pthread_mutex_lock(&executor->work.lock);
+       if (executor->work.pending_count != 0) {
+               WARN("%" PRIu64
+                       " trigger action%s still queued for execution and will be discarded",
+                               executor->work.pending_count,
+                               executor->work.pending_count == 1 ? " is" :
+                                                                   "s are");
+       }
+
+       cds_list_for_each_entry_safe (
+                       work_item, tmp, &executor->work.list, list_node) {
+               WARN("Discarding action work item %" PRIu64
+                               " associated to trigger \"%p\"",
+                               work_item->id, work_item->trigger);
+               cds_list_del(&work_item->list_node);
+               action_work_item_destroy(work_item);
+       }
+       pthread_mutex_unlock(&executor->work.lock);
+       lttng_thread_put(executor->thread);
+}
+
+/* RCU read-lock must be held by the caller. */
+enum action_executor_status action_executor_enqueue(
+               struct action_executor *executor,
+               struct lttng_trigger *trigger,
+               struct lttng_evaluation *evaluation,
+               const struct lttng_credentials *object_creds,
+               struct notification_client_list *client_list)
+{
+       enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
+       const uint64_t work_item_id = executor->next_work_item_id++;
+       struct action_work_item *work_item;
+       bool signal = false;
+
+       pthread_mutex_lock(&executor->work.lock);
+       /* Check for queue overflow. */
+       if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
+               /* Most likely spammy, remove if it is the case. */
+               DBG("Refusing to enqueue action for trigger \"%p\" as work item %" PRIu64
+                   " (overflow)",
+                               trigger, work_item_id);
+               executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
+               goto error_unlock;
+       }
+
+       work_item = zmalloc(sizeof(*work_item));
+       if (!work_item) {
+               PERROR("Failed to allocate action executor work item on behalf of trigger \"%p\"",
+                               trigger);
+               executor_status = ACTION_EXECUTOR_STATUS_ERROR;
+               goto error_unlock;
+       }
+
+       lttng_trigger_get(trigger);
+       if (client_list) {
+               const bool reference_acquired =
+                               notification_client_list_get(client_list);
+
+               assert(reference_acquired);
+       }
+
+       *work_item = (typeof(*work_item)){
+                       .id = work_item_id,
+                       .trigger = trigger,
+                       /* Ownership transferred to the work item. */
+                       .evaluation = evaluation,
+                       .object_creds = {
+                               .is_set = !!object_creds,
+                               .value = object_creds ? *object_creds :
+                                       (typeof(work_item->object_creds.value)) {},
+                       },
+                       .client_list = client_list,
+                       .list_node = CDS_LIST_HEAD_INIT(work_item->list_node),
+       };
+
+       evaluation = NULL;
+       cds_list_add_tail(&work_item->list_node, &executor->work.list);
+       executor->work.pending_count++;
+       DBG("Enqueued action for trigger \"%p\" as work item %" PRIu64,
+                       trigger, work_item_id);
+       signal = true;
+
+error_unlock:
+       pthread_mutex_unlock(&executor->work.lock);
+       if (signal) {
+               pthread_cond_signal(&executor->work.cond);
+       }
+
+       lttng_evaluation_destroy(evaluation);
+       return executor_status;
+}
diff --git a/src/bin/lttng-sessiond/action-executor.h b/src/bin/lttng-sessiond/action-executor.h
new file mode 100644 (file)
index 0000000..e01a377
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#ifndef ACTION_EXECUTOR_H
+#define ACTION_EXECUTOR_H
+
+struct action_executor;
+struct notification_thread_handle;
+struct lttng_evaluation;
+struct lttng_trigger;
+struct notification_client_list;
+struct lttng_credentials;
+
+enum action_executor_status {
+       ACTION_EXECUTOR_STATUS_OK,
+       ACTION_EXECUTOR_STATUS_OVERFLOW,
+       ACTION_EXECUTOR_STATUS_ERROR,
+       ACTION_EXECUTOR_STATUS_INVALID,
+};
+
+struct action_executor *action_executor_create(
+               struct notification_thread_handle *handle);
+
+void action_executor_destroy(struct action_executor *executor);
+
+enum action_executor_status action_executor_enqueue(
+               struct action_executor *executor,
+               struct lttng_trigger *trigger,
+               struct lttng_evaluation *evaluation,
+               const struct lttng_credentials *object_creds,
+               struct notification_client_list *list);
+
+#endif /* ACTION_EXECUTOR_H */
index 7c9dbd0b3218d6c0f3cbc7c9f24d9060ed106390..b541822f87081ed47b8d7d551cabdb728d516992 100644 (file)
@@ -23,6 +23,7 @@ enum health_type_sessiond {
        HEALTH_SESSIOND_TYPE_NOTIFICATION       = 8,
        HEALTH_SESSIOND_TYPE_ROTATION           = 9,
        HEALTH_SESSIOND_TYPE_TIMER              = 10,
+       HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR    = 11,
 
        NR_HEALTH_SESSIOND_TYPES,
 };
index a3b41f989e72d5de6e116af5f5730a03f8f05c33..dc03e6a2df2fd9e7eab8634b5ece6ebb557c91de 100644 (file)
@@ -280,3 +280,18 @@ void notification_thread_command_quit(
        ret = run_command_wait(handle, &cmd);
        assert(!ret && cmd.reply_code == LTTNG_OK);
 }
+
+int notification_thread_client_communication_update(
+               struct notification_thread_handle *handle,
+               notification_client_id id,
+               enum client_transmission_status transmission_status)
+{
+       struct notification_thread_command cmd = {};
+
+       init_notification_thread_command(&cmd);
+
+       cmd.type = NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE;
+       cmd.parameters.client_communication_update.id = id;
+       cmd.parameters.client_communication_update.status = transmission_status;
+       return run_command_no_wait(handle, &cmd);
+}
index 11889f934f120f2e056a03cce29f3e224179d180..c09ebea46f0c72a0e0acc1e42da590dd3b096423 100644 (file)
@@ -28,6 +28,7 @@ enum notification_thread_command_type {
        NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING,
        NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED,
        NOTIFICATION_COMMAND_TYPE_QUIT,
+       NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE,
 };
 
 struct notification_thread_command {
@@ -63,6 +64,12 @@ struct notification_thread_command {
                        uint64_t trace_archive_chunk_id;
                        struct lttng_trace_archive_location *location;
                } session_rotation;
+               /* Client communication update. */
+               struct {
+                       notification_client_id id;
+                       enum client_transmission_status status;
+               } client_communication_update;
+
        } parameters;
 
        /* lttng_waiter on which to wait for command reply (optional). */
index 6ba6c415b65b74bf2c6571fb6261d6f3f3a34f80..8d1290d38a64c64765f223c6516f24b6bcff5c23 100644 (file)
@@ -50,7 +50,7 @@ enum lttng_object_type {
 
 struct lttng_trigger_list_element {
        /* No ownership of the trigger object is assumed. */
-       const struct lttng_trigger *trigger;
+       struct lttng_trigger *trigger;
        struct cds_list_head node;
 };
 
@@ -117,122 +117,6 @@ struct lttng_condition_list_element {
        struct cds_list_head node;
 };
 
-struct notification_client_list_element {
-       struct notification_client *client;
-       struct cds_list_head node;
-};
-
-/*
- * Thread safety of notification_client and notification_client_list.
- *
- * The notification thread (main thread) and the action executor
- * interact through client lists. Hence, when the action executor
- * thread looks-up the list of clients subscribed to a given
- * condition, it will acquire a reference to the list and lock it
- * while attempting to communicate with the various clients.
- *
- * It is not necessary to reference-count clients as they are guaranteed
- * to be 'alive' if they are present in a list and that list is locked. Indeed,
- * removing references to the client from those subscription lists is part of
- * the work performed on destruction of a client.
- *
- * No provision for other access scenarios are taken into account;
- * this is the bare minimum to make these accesses safe and the
- * notification thread's state is _not_ "thread-safe" in any general
- * sense.
- */
-struct notification_client_list {
-       pthread_mutex_t lock;
-       struct urcu_ref ref;
-       const struct lttng_trigger *trigger;
-       struct cds_list_head list;
-       /* Weak reference to container. */
-       struct cds_lfht *notification_trigger_clients_ht;
-       struct cds_lfht_node notification_trigger_clients_ht_node;
-       /* call_rcu delayed reclaim. */
-       struct rcu_head rcu_node;
-};
-
-struct notification_client {
-       /* Nests within the notification_client_list lock. */
-       pthread_mutex_t lock;
-       notification_client_id id;
-       int socket;
-       /* Client protocol version. */
-       uint8_t major, minor;
-       uid_t uid;
-       gid_t gid;
-       /*
-        * Indicates if the credentials and versions of the client have been
-        * checked.
-        */
-       bool validated;
-       /*
-        * Conditions to which the client's notification channel is subscribed.
-        * List of struct lttng_condition_list_node. The condition member is
-        * owned by the client.
-        */
-       struct cds_list_head condition_list;
-       struct cds_lfht_node client_socket_ht_node;
-       struct cds_lfht_node client_id_ht_node;
-       struct {
-               /*
-                * If a client's communication is inactive, it means that a
-                * fatal error has occurred (could be either a protocol error or
-                * the socket API returned a fatal error). No further
-                * communication should be attempted; the client is queued for
-                * clean-up.
-                */
-               bool active;
-               struct {
-                       /*
-                        * During the reception of a message, the reception
-                        * buffers' "size" is set to contain the current
-                        * message's complete payload.
-                        */
-                       struct lttng_dynamic_buffer buffer;
-                       /* Bytes left to receive for the current message. */
-                       size_t bytes_to_receive;
-                       /* Type of the message being received. */
-                       enum lttng_notification_channel_message_type msg_type;
-                       /*
-                        * Indicates whether or not credentials are expected
-                        * from the client.
-                        */
-                       bool expect_creds;
-                       /*
-                        * Indicates whether or not credentials were received
-                        * from the client.
-                        */
-                       bool creds_received;
-                       /* Only used during credentials reception. */
-                       lttng_sock_cred creds;
-               } inbound;
-               struct {
-                       /*
-                        * Indicates whether or not a notification addressed to
-                        * this client was dropped because a command reply was
-                        * already buffered.
-                        *
-                        * A notification is dropped whenever the buffer is not
-                        * empty.
-                        */
-                       bool dropped_notification;
-                       /*
-                        * Indicates whether or not a command reply is already
-                        * buffered. In this case, it means that the client is
-                        * not consuming command replies before emitting a new
-                        * one. This could be caused by a protocol error or a
-                        * misbehaving/malicious client.
-                        */
-                       bool queued_command_reply;
-                       struct lttng_dynamic_buffer buffer;
-               } outbound;
-       } communication;
-       /* call_rcu delayed reclaim. */
-       struct rcu_head rcu_node;
-};
-
 struct channel_state_sample {
        struct channel_key key;
        struct cds_lfht_node channel_state_ht_node;
@@ -293,8 +177,13 @@ void lttng_session_trigger_list_destroy(
                struct lttng_session_trigger_list *list);
 static
 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
-               const struct lttng_trigger *trigger);
+               struct lttng_trigger *trigger);
 
+static
+int client_handle_transmission_status(
+               struct notification_client *client,
+               enum client_transmission_status transmission_status,
+               struct notification_thread_state *state);
 
 static
 int match_client_socket(struct cds_lfht_node *node, const void *key)
@@ -722,7 +611,7 @@ error:
        return NULL;
 }
 
-static
+LTTNG_HIDDEN
 bool notification_client_list_get(struct notification_client_list *list)
 {
        return urcu_ref_get_unless_zero(&list->ref);
@@ -796,7 +685,7 @@ void publish_notification_client_list(
        rcu_read_unlock();
 }
 
-static
+LTTNG_HIDDEN
 void notification_client_list_put(struct notification_client_list *list)
 {
        if (!list) {
@@ -1326,6 +1215,34 @@ end:
        return client;
 }
 
+/*
+ * Call with rcu_read_lock held (and hold for the lifetime of the returned
+ * client pointer).
+ */
+static
+struct notification_client *get_client_from_id(notification_client_id id,
+               struct notification_thread_state *state)
+{
+       struct cds_lfht_iter iter;
+       struct cds_lfht_node *node;
+       struct notification_client *client = NULL;
+
+       cds_lfht_lookup(state->client_id_ht,
+                       hash_client_id(id),
+                       match_client_id,
+                       &id,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       if (!node) {
+               goto end;
+       }
+
+       client = caa_container_of(node, struct notification_client,
+                       client_id_ht_node);
+end:
+       return client;
+}
+
 static
 bool buffer_usage_condition_applies_to_channel(
                const struct lttng_condition *condition,
@@ -1529,7 +1446,7 @@ void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
 
 static
 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
-               const struct lttng_trigger *trigger)
+               struct lttng_trigger *trigger)
 {
        int ret = 0;
        struct lttng_trigger_list_element *new_element =
@@ -1886,6 +1803,10 @@ int handle_notification_thread_command_session_rotation(
        struct lttng_session_trigger_list *trigger_list;
        struct lttng_trigger_list_element *trigger_list_element;
        struct session_info *session_info;
+       const struct lttng_credentials session_creds = {
+               .uid = session_uid,
+               .gid = session_gid,
+       };
 
        rcu_read_lock();
 
@@ -1912,11 +1833,12 @@ int handle_notification_thread_command_session_rotation(
                        node) {
                const struct lttng_condition *condition;
                const struct lttng_action *action;
-               const struct lttng_trigger *trigger;
+               struct lttng_trigger *trigger;
                struct notification_client_list *client_list;
                struct lttng_evaluation *evaluation = NULL;
                enum lttng_condition_type condition_type;
                bool client_list_is_empty;
+               enum action_executor_status executor_status;
 
                trigger = trigger_list_element->trigger;
                condition = lttng_trigger_get_const_condition(trigger);
@@ -1966,12 +1888,40 @@ int handle_notification_thread_command_session_rotation(
                        goto put_list;
                }
 
-               /* Dispatch evaluation result to all clients. */
-               ret = send_evaluation_to_clients(trigger_list_element->trigger,
-                               evaluation, client_list, state,
-                               session_info->uid,
-                               session_info->gid);
-               lttng_evaluation_destroy(evaluation);
+               /*
+                * Ownership of `evaluation` transferred to the action executor
+                * no matter the result.
+                */
+               executor_status = action_executor_enqueue(state->executor,
+                               trigger, evaluation, &session_creds,
+                               client_list);
+               evaluation = NULL;
+               switch (executor_status) {
+               case ACTION_EXECUTOR_STATUS_OK:
+                       break;
+               case ACTION_EXECUTOR_STATUS_ERROR:
+               case ACTION_EXECUTOR_STATUS_INVALID:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        */
+                       ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
+                       ret = -1;
+                       goto put_list;
+               case ACTION_EXECUTOR_STATUS_OVERFLOW:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        *
+                        * Not a fatal error.
+                        */
+                       WARN("No space left when enqueuing action associated with session rotation trigger");
+                       ret = 0;
+                       goto put_list;
+               default:
+                       abort();
+               }
+
 put_list:
                notification_client_list_put(client_list);
                if (caa_unlikely(ret)) {
@@ -2025,7 +1975,7 @@ end:
 
 /* Must be called with RCU read lock held. */
 static
-int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
+int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
                struct notification_thread_state *state)
 {
        int ret = 0;
@@ -2071,7 +2021,7 @@ end:
 
 /* Must be called with RCU read lock held. */
 static
-int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
+int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
                struct notification_thread_state *state)
 {
        int ret = 0;
@@ -2467,6 +2417,36 @@ int handle_notification_thread_command(
                cmd->reply_code = LTTNG_OK;
                ret = 1;
                goto end;
+       case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
+       {
+               const enum client_transmission_status client_status =
+                               cmd->parameters.client_communication_update
+                                               .status;
+               const notification_client_id client_id =
+                               cmd->parameters.client_communication_update.id;
+               struct notification_client *client;
+
+               rcu_read_lock();
+               client = get_client_from_id(client_id, state);
+
+               if (!client) {
+                       /*
+                        * Client error was probably already picked-up by the
+                        * notification thread or it has disconnected
+                        * gracefully while this command was queued.
+                        */
+                       DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
+                                       client_id);
+                       ret = 0;
+               } else {
+                       pthread_mutex_lock(&client->lock);
+                       ret = client_handle_transmission_status(
+                                       client, client_status, state);
+                       pthread_mutex_unlock(&client->lock);
+               }
+               rcu_read_unlock();
+               break;
+       }
        default:
                ERR("[notification-thread] Unknown internal command received");
                goto error_unlock;
@@ -2773,8 +2753,7 @@ end:
 /* Client lock must be acquired by caller. */
 static
 enum client_transmission_status client_flush_outgoing_queue(
-               struct notification_client *client,
-               struct notification_thread_state *state)
+               struct notification_client *client)
 {
        ssize_t ret;
        size_t to_send_count;
@@ -2782,6 +2761,11 @@ enum client_transmission_status client_flush_outgoing_queue(
 
        ASSERT_LOCKED(client->lock);
 
+       if (!client->communication.active) {
+               status = CLIENT_TRANSMISSION_STATUS_FAIL;
+               goto end;
+       }
+
        assert(client->communication.outbound.buffer.size != 0);
        to_send_count = client->communication.outbound.buffer.size;
        DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
@@ -2803,7 +2787,6 @@ enum client_transmission_status client_flush_outgoing_queue(
                                &client->communication.outbound.buffer,
                                to_send_count);
                if (ret) {
-                       status = CLIENT_TRANSMISSION_STATUS_ERROR;
                        goto error;
                }
                status = CLIENT_TRANSMISSION_STATUS_QUEUED;
@@ -2817,20 +2800,14 @@ enum client_transmission_status client_flush_outgoing_queue(
                ret = lttng_dynamic_buffer_set_size(
                                &client->communication.outbound.buffer, 0);
                if (ret) {
-                       status = CLIENT_TRANSMISSION_STATUS_ERROR;
                        goto error;
                }
                status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
        }
-
-       ret = client_handle_transmission_status(client, status, state);
-       if (ret) {
-               goto error;
-       }
-
-       return 0;
+end:
+       return status;
 error:
-       return -1;
+       return CLIENT_TRANSMISSION_STATUS_ERROR;
 }
 
 /* Client lock must be acquired by caller. */
@@ -2848,6 +2825,7 @@ int client_send_command_reply(struct notification_client *client,
                .size = sizeof(reply),
        };
        char buffer[sizeof(msg) + sizeof(reply)];
+       enum client_transmission_status transmission_status;
 
        ASSERT_LOCKED(client->lock);
 
@@ -2868,7 +2846,9 @@ int client_send_command_reply(struct notification_client *client,
                goto error;
        }
 
-       ret = client_flush_outgoing_queue(client, state);
+       transmission_status = client_flush_outgoing_queue(client);
+       ret = client_handle_transmission_status(
+                       client, transmission_status, state);
        if (ret) {
                goto error;
        }
@@ -2948,6 +2928,7 @@ int client_handle_message_handshake(struct notification_client *client,
        enum lttng_notification_channel_status status =
                        LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
        char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
+       enum client_transmission_status transmission_status;
 
        pthread_mutex_lock(&client->lock);
 
@@ -2991,7 +2972,9 @@ int client_handle_message_handshake(struct notification_client *client,
        client->validated = true;
        client->communication.active = true;
 
-       ret = client_flush_outgoing_queue(client, state);
+       transmission_status = client_flush_outgoing_queue(client);
+       ret = client_handle_transmission_status(
+                       client, transmission_status, state);
        if (ret) {
                goto end;
        }
@@ -3181,6 +3164,7 @@ int handle_notification_thread_client_out(
 {
        int ret;
        struct notification_client *client;
+       enum client_transmission_status transmission_status;
 
        client = get_client_from_socket(socket, state);
        if (!client) {
@@ -3190,7 +3174,9 @@ int handle_notification_thread_client_out(
        }
 
        pthread_mutex_lock(&client->lock);
-       ret = client_flush_outgoing_queue(client, state);
+       transmission_status = client_flush_outgoing_queue(client);
+       ret = client_handle_transmission_status(
+                       client, transmission_status, state);
        pthread_mutex_unlock(&client->lock);
        if (ret) {
                goto end;
@@ -3362,22 +3348,62 @@ end:
 }
 
 static
-int client_enqueue_dropped_notification(struct notification_client *client)
+int client_notification_overflow(struct notification_client *client)
 {
-       int ret;
-       struct lttng_notification_channel_message msg = {
+       int ret = 0;
+       const struct lttng_notification_channel_message msg = {
                .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
-               .size = 0,
        };
 
        ASSERT_LOCKED(client->lock);
 
+       DBG("Dropping notification addressed to client (socket fd = %i)",
+                       client->socket);
+       if (client->communication.outbound.dropped_notification) {
+               /*
+                * The client already has a "notification dropped" message
+                * in its outgoing queue. Nothing to do since all
+                * of those messages are coalesced.
+                */
+               goto end;
+       }
+
+       client->communication.outbound.dropped_notification = true;
        ret = lttng_dynamic_buffer_append(
                        &client->communication.outbound.buffer, &msg,
                        sizeof(msg));
+       if (ret) {
+               PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
+                               client->socket);
+       }
+end:
        return ret;
 }
 
+static int client_handle_transmission_status_wrapper(
+               struct notification_client *client,
+               enum client_transmission_status status,
+               void *user_data)
+{
+       return client_handle_transmission_status(client, status,
+                       (struct notification_thread_state *) user_data);
+}
+
+static
+int send_evaluation_to_clients(const struct lttng_trigger *trigger,
+               const struct lttng_evaluation *evaluation,
+               struct notification_client_list* client_list,
+               struct notification_thread_state *state,
+               uid_t object_uid, gid_t object_gid)
+{
+       return notification_client_list_send_evaluation(client_list,
+                       lttng_trigger_get_const_condition(trigger), evaluation,
+                       lttng_trigger_get_credentials(trigger),
+                       &(struct lttng_credentials){
+                                       .uid = object_uid, .gid = object_gid},
+                       client_handle_transmission_status_wrapper, state);
+}
+
 /*
  * Permission checks relative to notification channel clients are performed
  * here. Notice how object, client, and trigger credentials are involved in
@@ -3411,24 +3437,26 @@ int client_enqueue_dropped_notification(struct notification_client *client)
  * interference from external users (those could, for instance, unregister
  * their triggers).
  */
-static
-int send_evaluation_to_clients(const struct lttng_trigger *trigger,
+LTTNG_HIDDEN
+int notification_client_list_send_evaluation(
+               struct notification_client_list *client_list,
+               const struct lttng_condition *condition,
                const struct lttng_evaluation *evaluation,
-               struct notification_client_list* client_list,
-               struct notification_thread_state *state,
-               uid_t object_uid, gid_t object_gid)
+               const struct lttng_credentials *trigger_creds,
+               const struct lttng_credentials *source_object_creds,
+               report_client_transmission_result_cb client_report,
+               void *user_data)
 {
        int ret = 0;
        struct lttng_payload msg_payload;
        struct notification_client_list_element *client_list_element, *tmp;
        const struct lttng_notification notification = {
-               .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
+               .condition = (struct lttng_condition *) condition,
                .evaluation = (struct lttng_evaluation *) evaluation,
        };
        struct lttng_notification_channel_message msg_header = {
                .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
        };
-       const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
 
        lttng_payload_init(&msg_payload);
 
@@ -3453,16 +3481,23 @@ int send_evaluation_to_clients(const struct lttng_trigger *trigger,
        pthread_mutex_lock(&client_list->lock);
        cds_list_for_each_entry_safe(client_list_element, tmp,
                        &client_list->list, node) {
+               enum client_transmission_status transmission_status;
                struct notification_client *client =
                                client_list_element->client;
 
                ret = 0;
                pthread_mutex_lock(&client->lock);
-               if (client->uid != object_uid && client->gid != object_gid &&
-                               client->uid != 0) {
-                       /* Client is not allowed to monitor this channel. */
-                       DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
-                       goto unlock_client;
+               if (source_object_creds) {
+                       if (client->uid != source_object_creds->uid &&
+                                       client->gid != source_object_creds->gid &&
+                                       client->uid != 0) {
+                               /*
+                                * Client is not allowed to monitor this
+                                * object.
+                                */
+                               DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
+                               goto unlock_client;
+                       }
                }
 
                if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
@@ -3480,17 +3515,10 @@ int send_evaluation_to_clients(const struct lttng_trigger *trigger,
                         * notification since the socket spilled-over to the
                         * queue.
                         */
-                       DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
-                                       client->socket);
-                       if (!client->communication.outbound.dropped_notification) {
-                               client->communication.outbound.dropped_notification = true;
-                               ret = client_enqueue_dropped_notification(
-                                               client);
-                               if (ret) {
-                                       goto unlock_client;
-                               }
+                       ret = client_notification_overflow(client);
+                       if (ret) {
+                               goto unlock_client;
                        }
-                       goto unlock_client;
                }
 
                ret = lttng_dynamic_buffer_append_buffer(
@@ -3500,7 +3528,8 @@ int send_evaluation_to_clients(const struct lttng_trigger *trigger,
                        goto unlock_client;
                }
 
-               ret = client_flush_outgoing_queue(client, state);
+               transmission_status = client_flush_outgoing_queue(client);
+               ret = client_report(client, transmission_status, user_data);
                if (ret) {
                        goto unlock_client;
                }
@@ -3533,6 +3562,7 @@ int handle_notification_thread_channel_sample(
        bool previous_sample_available = false;
        struct channel_state_sample previous_sample, latest_sample;
        uint64_t previous_session_consumed_total, latest_session_consumed_total;
+       struct lttng_credentials channel_creds;
 
        /*
         * The monitoring pipe only holds messages smaller than PIPE_BUF,
@@ -3651,16 +3681,22 @@ int handle_notification_thread_channel_sample(
                goto end_unlock;
        }
 
+       channel_creds = (typeof(channel_creds)) {
+               .uid = channel_info->session_info->uid,
+               .gid = channel_info->session_info->gid,
+       };
+
        trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
                        channel_triggers_ht_node);
        cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
                        node) {
                const struct lttng_condition *condition;
                const struct lttng_action *action;
-               const struct lttng_trigger *trigger;
+               struct lttng_trigger *trigger;
                struct notification_client_list *client_list = NULL;
                struct lttng_evaluation *evaluation = NULL;
                bool client_list_is_empty;
+               enum action_executor_status executor_status;
 
                ret = 0;
                trigger = trigger_list_element->trigger;
@@ -3701,12 +3737,40 @@ int handle_notification_thread_channel_sample(
                        goto put_list;
                }
 
-               /* Dispatch evaluation result to all clients. */
-               ret = send_evaluation_to_clients(trigger_list_element->trigger,
-                               evaluation, client_list, state,
-                               channel_info->session_info->uid,
-                               channel_info->session_info->gid);
-               lttng_evaluation_destroy(evaluation);
+               /*
+                * Ownership of `evaluation` transferred to the action executor
+                * no matter the result.
+                */
+               executor_status = action_executor_enqueue(state->executor,
+                               trigger, evaluation, &channel_creds,
+                               client_list);
+               evaluation = NULL;
+               switch (executor_status) {
+               case ACTION_EXECUTOR_STATUS_OK:
+                       break;
+               case ACTION_EXECUTOR_STATUS_ERROR:
+               case ACTION_EXECUTOR_STATUS_INVALID:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        */
+                       ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
+                       ret = -1;
+                       goto put_list;
+               case ACTION_EXECUTOR_STATUS_OVERFLOW:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        *
+                        * Not a fatal error.
+                        */
+                       WARN("No space left when enqueuing action associated with buffer-condition trigger");
+                       ret = 0;
+                       goto put_list;
+               default:
+                       abort();
+               }
+
 put_list:
                notification_client_list_put(client_list);
                if (caa_unlikely(ret)) {
index b278f83d32fbf5fcaaaec646b77075bb6b59df30..5aaac9d23fb6821e9a24634741b7d9760f04b665 100644 (file)
@@ -8,9 +8,19 @@
 #ifndef NOTIFICATION_THREAD_INTERNAL_H
 #define NOTIFICATION_THREAD_INTERNAL_H
 
+#include <common/compat/socket.h>
+#include <common/credentials.h>
+#include <lttng/notification/channel-internal.h>
 #include <lttng/ref-internal.h>
-#include <urcu/rculfhash.h>
+#include <stdbool.h>
 #include <unistd.h>
+#include <urcu/rculfhash.h>
+#include <urcu/ref.h>
+#include <urcu/call-rcu.h>
+#include "notification-thread.h"
+
+struct lttng_evaluation;
+struct notification_thread_handle;
 
 struct channel_key {
        uint64_t key;
@@ -64,6 +74,122 @@ struct channel_info {
        struct rcu_head rcu_node;
 };
 
+struct notification_client_list_element {
+       struct notification_client *client;
+       struct cds_list_head node;
+};
+
+/*
+ * Thread safety of notification_client and notification_client_list.
+ *
+ * The notification thread (main thread) and the action executor
+ * interact through client lists. Hence, when the action executor
+ * thread looks-up the list of clients subscribed to a given
+ * condition, it will acquire a reference to the list and lock it
+ * while attempting to communicate with the various clients.
+ *
+ * It is not necessary to reference-count clients as they are guaranteed
+ * to be 'alive' if they are present in a list and that list is locked. Indeed,
+ * removing references to the client from those subscription lists is part of
+ * the work performed on destruction of a client.
+ *
+ * No provision for other access scenarios are taken into account;
+ * this is the bare minimum to make these accesses safe and the
+ * notification thread's state is _not_ "thread-safe" in any general
+ * sense.
+ */
+struct notification_client_list {
+       pthread_mutex_t lock;
+       struct urcu_ref ref;
+       const struct lttng_trigger *trigger;
+       struct cds_list_head list;
+       /* Weak reference to container. */
+       struct cds_lfht *notification_trigger_clients_ht;
+       struct cds_lfht_node notification_trigger_clients_ht_node;
+       /* call_rcu delayed reclaim. */
+       struct rcu_head rcu_node;
+};
+
+struct notification_client {
+       /* Nests within the notification_client_list lock. */
+       pthread_mutex_t lock;
+       notification_client_id id;
+       int socket;
+       /* Client protocol version. */
+       uint8_t major, minor;
+       uid_t uid;
+       gid_t gid;
+       /*
+        * Indicates if the credentials and versions of the client have been
+        * checked.
+        */
+       bool validated;
+       /*
+        * Conditions to which the client's notification channel is subscribed.
+        * List of struct lttng_condition_list_node. The condition member is
+        * owned by the client.
+        */
+       struct cds_list_head condition_list;
+       struct cds_lfht_node client_socket_ht_node;
+       struct cds_lfht_node client_id_ht_node;
+       struct {
+               /*
+                * If a client's communication is inactive, it means that a
+                * fatal error has occurred (could be either a protocol error or
+                * the socket API returned a fatal error). No further
+                * communication should be attempted; the client is queued for
+                * clean-up.
+                */
+               bool active;
+               struct {
+                       /*
+                        * During the reception of a message, the reception
+                        * buffers' "size" is set to contain the current
+                        * message's complete payload.
+                        */
+                       struct lttng_dynamic_buffer buffer;
+                       /* Bytes left to receive for the current message. */
+                       size_t bytes_to_receive;
+                       /* Type of the message being received. */
+                       enum lttng_notification_channel_message_type msg_type;
+                       /*
+                        * Indicates whether or not credentials are expected
+                        * from the client.
+                        */
+                       bool expect_creds;
+                       /*
+                        * Indicates whether or not credentials were received
+                        * from the client.
+                        */
+                       bool creds_received;
+                       /* Only used during credentials reception. */
+                       lttng_sock_cred creds;
+               } inbound;
+               struct {
+                       /*
+                        * Indicates whether or not a notification addressed to
+                        * this client was dropped because a command reply was
+                        * already buffered.
+                        *
+                        * A notification is dropped whenever the buffer is not
+                        * empty.
+                        */
+                       bool dropped_notification;
+                       /*
+                        * Indicates whether or not a command reply is already
+                        * buffered. In this case, it means that the client is
+                        * not consuming command replies before emitting a new
+                        * one. This could be caused by a protocol error or a
+                        * misbehaving/malicious client.
+                        */
+                       bool queued_command_reply;
+                       struct lttng_dynamic_buffer buffer;
+               } outbound;
+       } communication;
+       /* call_rcu delayed reclaim. */
+       struct rcu_head rcu_node;
+};
+
 enum client_transmission_status {
        CLIENT_TRANSMISSION_STATUS_COMPLETE,
        CLIENT_TRANSMISSION_STATUS_QUEUED,
@@ -72,4 +198,32 @@ enum client_transmission_status {
        /* Fatal error. */
        CLIENT_TRANSMISSION_STATUS_ERROR,
 };
+
+LTTNG_HIDDEN
+bool notification_client_list_get(struct notification_client_list *list);
+
+LTTNG_HIDDEN
+void notification_client_list_put(struct notification_client_list *list);
+
+typedef int (*report_client_transmission_result_cb)(
+               struct notification_client *client,
+               enum client_transmission_status status,
+               void *user_data);
+
+LTTNG_HIDDEN
+int notification_client_list_send_evaluation(
+               struct notification_client_list *list,
+               const struct lttng_condition *condition,
+               const struct lttng_evaluation *evaluation,
+               const struct lttng_credentials *trigger_creds,
+               const struct lttng_credentials *source_object_creds,
+               report_client_transmission_result_cb client_report,
+               void *user_data);
+
+LTTNG_HIDDEN
+int notification_thread_client_communication_update(
+               struct notification_thread_handle *handle,
+               notification_client_id id,
+               enum client_transmission_status transmission_status);
+
 #endif /* NOTIFICATION_THREAD_INTERNAL_H */
index c964e7c910b062a82753c50c58b78d728a9454c1..3ae8741d65b10a346bec2c965c1581873c8ea155 100644 (file)
@@ -375,6 +375,9 @@ void fini_thread_state(struct notification_thread_state *state)
                notification_channel_socket_destroy(
                                state->notification_channel_socket);
        }
+       if (state->executor) {
+               action_executor_destroy(state->executor);
+       }
        lttng_poll_clean(&state->events);
 }
 
@@ -473,6 +476,11 @@ int init_thread_state(struct notification_thread_handle *handle,
        if (!state->triggers_ht) {
                goto error;
        }
+
+       state->executor = action_executor_create(handle);
+       if (!state->executor) {
+               goto error;
+       }
        mark_thread_as_ready(handle);
 end:
        return 0;
index 21cc086c6ee8ec9c2db4b7b9c0daceee28ab1b99..134804f9d0718a76415e898f556e612921c95f9f 100644 (file)
@@ -8,16 +8,17 @@
 #ifndef NOTIFICATION_THREAD_H
 #define NOTIFICATION_THREAD_H
 
-#include <urcu/list.h>
-#include <urcu.h>
-#include <urcu/rculfhash.h>
-#include <lttng/trigger/trigger.h>
-#include <common/pipe.h>
+#include "action-executor.h"
+#include "thread.h"
 #include <common/compat/poll.h>
 #include <common/hashtable/hashtable.h>
+#include <common/pipe.h>
+#include <lttng/trigger/trigger.h>
 #include <pthread.h>
 #include <semaphore.h>
-#include "thread.h"
+#include <urcu.h>
+#include <urcu/list.h>
+#include <urcu/rculfhash.h>
 
 
 typedef uint64_t notification_client_id;
@@ -210,6 +211,7 @@ struct notification_thread_state {
        struct cds_lfht *sessions_ht;
        struct cds_lfht *triggers_ht;
        notification_client_id next_notification_client_id;
+       struct action_executor *executor;
 };
 
 /* notification_thread_data takes ownership of the channel monitor pipes. */
index 26661a36f2aae08c0b21426b9fa343ac9f820a61..ae7f45fd97f8257b645d37513e80c3274410d938 100644 (file)
@@ -165,20 +165,22 @@ bool _lttng_thread_shutdown(struct lttng_thread *thread)
                result = false;
                goto end;
        }
-       /* Release the list's reference to the thread. */
-       cds_list_del(&thread->node);
-       lttng_thread_put(thread);
+       DBG("Joined thread \"%s\"", thread->name);
 end:
        return result;
 }
 
 bool lttng_thread_shutdown(struct lttng_thread *thread)
 {
-       bool result;
-
-       pthread_mutex_lock(&thread_list.lock);
-       result = _lttng_thread_shutdown(thread);
-       pthread_mutex_unlock(&thread_list.lock);
+       const bool result = _lttng_thread_shutdown(thread);
+
+       if (result) {
+               /* Release the list's reference to the thread. */
+               pthread_mutex_lock(&thread_list.lock);
+               cds_list_del(&thread->node);
+               lttng_thread_put(thread);
+               pthread_mutex_unlock(&thread_list.lock);
+       }
        return result;
 }
 
index d6a3e4f135bee8696fbada9be6840e34f1df8109..91108c166b9825227248fa1e7bb00ce59b7ea7c5 100644 (file)
@@ -62,6 +62,7 @@ const char *sessiond_thread_name[NR_HEALTH_SESSIOND_TYPES] = {
        [ HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH ] = "Session daemon application registration dispatcher",
        [ HEALTH_SESSIOND_TYPE_ROTATION ] = "Session daemon rotation manager",
        [ HEALTH_SESSIOND_TYPE_TIMER ] = "Session daemon timer manager",
+       [ HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR ] = "Session daemon trigger action executor",
 };
 
 static
index 4ecc7a8c86f411af0bab90f21ae0506067da1d55..3387720683aa7acf31cbebe93ac7d960f28d47f9 100644 (file)
@@ -65,6 +65,7 @@ SESSIOND_OBJS = $(top_builddir)/src/bin/lttng-sessiond/buffer-registry.$(OBJEXT)
         $(top_builddir)/src/bin/lttng-sessiond/kernel.$(OBJEXT) \
         $(top_builddir)/src/bin/lttng-sessiond/ht-cleanup.$(OBJEXT) \
         $(top_builddir)/src/bin/lttng-sessiond/notification-thread.$(OBJEXT) \
+        $(top_builddir)/src/bin/lttng-sessiond/action-executor.$(OBJEXT) \
         $(top_builddir)/src/bin/lttng-sessiond/lttng-syscall.$(OBJEXT) \
         $(top_builddir)/src/bin/lttng-sessiond/channel.$(OBJEXT) \
         $(top_builddir)/src/bin/lttng-sessiond/agent.$(OBJEXT) \
This page took 0.050231 seconds and 5 git commands to generate.