SoW-2019-0002: Dynamic Snapshot
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.c
diff --git a/src/bin/lttng-sessiond/action-executor.c b/src/bin/lttng-sessiond/action-executor.c
new file mode 100644 (file)
index 0000000..40686eb
--- /dev/null
@@ -0,0 +1,654 @@
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#include "action-executor.h"
+#include "cmd.h"
+#include "health-sessiond.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-internal.h"
+#include "session.h"
+#include "thread.h"
+#include <common/macros.h>
+#include <lttng/action/group.h>
+#include <lttng/action/notify.h>
+#include <lttng/action/rotate-session.h>
+#include <lttng/action/snapshot-session.h>
+#include <lttng/action/start-session.h>
+#include <lttng/action/stop-session.h>
+#include <lttng/condition/evaluation.h>
+#include <lttng/condition/event-rule-internal.h>
+#include <lttng/lttng-error.h>
+#include <lttng/trigger/trigger-internal.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <urcu/list.h>
+
+#define THREAD_NAME "Action Executor"
+#define MAX_QUEUED_WORK_COUNT 8192
+
+struct action_work_item {
+       uint64_t id;
+       struct lttng_trigger *trigger;
+       struct notification_client_list *client_list;
+       struct cds_list_head list_node;
+};
+
+struct action_executor {
+       struct lttng_thread *thread;
+       struct notification_thread_handle *notification_thread_handle;
+       struct {
+               uint64_t pending_count;
+               struct cds_list_head list;
+               pthread_cond_t cond;
+               pthread_mutex_t lock;
+       } work;
+       bool should_quit;
+       uint64_t next_work_item_id;
+};
+
+typedef int (*action_executor_handler)(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *action);
+
+static int action_executor_notify_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_start_session_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_stop_session_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_rotate_session_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_snapshot_session_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_group_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+static int action_executor_generic_handler(struct action_executor *executor,
+               const struct action_work_item *,
+               const struct lttng_action *);
+
+static const action_executor_handler action_executors[] = {
+       [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
+       [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler,
+       [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler,
+       [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler,
+       [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler,
+       [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
+};
+
+static const char *get_action_name(const struct lttng_action *action)
+{
+       const char *action_type_names[] = {
+               [LTTNG_ACTION_TYPE_NOTIFY] = "Notify",
+               [LTTNG_ACTION_TYPE_START_SESSION] = "Start session",
+               [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session",
+               [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session",
+               [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session",
+               [LTTNG_ACTION_TYPE_GROUP] = "Group",
+       };
+
+       return action_type_names[lttng_action_get_type(action)];
+}
+
+static const char *get_trigger_name(const struct lttng_trigger *trigger)
+{
+       const char *trigger_name;
+       enum lttng_trigger_status trigger_status;
+
+       trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
+       assert(trigger_status == LTTNG_TRIGGER_STATUS_OK);
+
+       return trigger_name;
+}
+
+static int client_handle_transmission_status(
+               struct notification_client *client,
+               enum client_transmission_status status,
+               void *user_data)
+{
+       struct action_executor *executor = user_data;
+
+       switch (status) {
+       case CLIENT_TRANSMISSION_STATUS_COMPLETE:
+               DBG("Sent notification to client");
+               break;
+       default:
+               ERR("Could not send notification to client");
+       }
+
+       return 0;
+}
+
+static int action_executor_notify_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       struct lttng_evaluation *evaluation = NULL;
+
+       assert(work_item->client_list);
+
+       evaluation = lttng_evaluation_event_rule_create(
+                       get_trigger_name(work_item->trigger));
+       if (!evaluation) {
+               ERR("Failed to create event rule hit evaluation");
+               ret = -1;
+               goto end;
+       }
+
+       ret = notification_client_list_send_evaluation(work_item->client_list,
+                       lttng_trigger_get_const_condition(work_item->trigger),
+                       evaluation,
+                       lttng_trigger_get_credentials(work_item->trigger), NULL,
+                       client_handle_transmission_status, executor);
+end:
+       lttng_evaluation_destroy(evaluation);
+       return ret;
+}
+
+static int action_executor_start_session_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       const char *session_name;
+       enum lttng_action_status action_status;
+       struct ltt_session *session;
+
+       action_status = lttng_action_start_session_get_session_name(
+                       action, &session_name);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               ERR("Failed to get session name from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       session_lock_list();
+       session = session_find_by_name(session_name);
+       if (session) {
+               enum lttng_error_code cmd_ret;
+
+               session_lock(session);
+               cmd_ret = cmd_start_trace(session);
+               session_unlock(session);
+
+               switch (cmd_ret) {
+               case LTTNG_OK:
+                       DBG("Successfully started session \"%s\" on behalf of trigger \"%s\"",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger));
+                       break;
+               case LTTNG_ERR_TRACE_ALREADY_STARTED:
+                       DBG("Attempted to start session \"%s\" on behalf of trigger \"%s\" but it was already started",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger));
+                       break;
+               default:
+                       WARN("Failed to start session \"%s\" on behalf of trigger \"%s\": %s",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger),
+                                       lttng_strerror(-cmd_ret));
+                       break;
+               }
+               session_put(session);
+       } else {
+               DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
+                               session_name, get_action_name(action),
+                               get_trigger_name(work_item->trigger));
+       }
+       session_unlock_list();
+end:
+       return ret;
+}
+
+static int action_executor_stop_session_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       const char *session_name;
+       enum lttng_action_status action_status;
+       struct ltt_session *session;
+
+       action_status = lttng_action_stop_session_get_session_name(
+                       action, &session_name);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               ERR("Failed to get session name from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       session_lock_list();
+       session = session_find_by_name(session_name);
+       if (session) {
+               enum lttng_error_code cmd_ret;
+
+               session_lock(session);
+               cmd_ret = cmd_stop_trace(session);
+               session_unlock(session);
+
+               switch (cmd_ret) {
+               case LTTNG_OK:
+                       DBG("Successfully stopped session \"%s\" on behalf of trigger \"%s\"",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger));
+                       break;
+               case LTTNG_ERR_TRACE_ALREADY_STOPPED:
+                       DBG("Attempted to stop session \"%s\" on behalf of trigger \"%s\" but it was already stopped",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger));
+                       break;
+               default:
+                       WARN("Failed to stop session \"%s\" on behalf of trigger \"%s\": %s",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger),
+                                       lttng_strerror(-cmd_ret));
+                       break;
+               }
+               session_put(session);
+       } else {
+               DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
+                               session_name, get_action_name(action),
+                               get_trigger_name(work_item->trigger));
+       }
+       session_unlock_list();
+end:
+       return ret;
+}
+
+static int action_executor_rotate_session_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       const char *session_name;
+       enum lttng_action_status action_status;
+       struct ltt_session *session;
+
+       action_status = lttng_action_rotate_session_get_session_name(
+                       action, &session_name);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               ERR("Failed to get session name from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       session_lock_list();
+       session = session_find_by_name(session_name);
+       if (session) {
+               enum lttng_error_code cmd_ret;
+
+               session_lock(session);
+               cmd_ret = cmd_rotate_session(session, NULL, false,
+                               LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+               session_unlock(session);
+
+               switch (cmd_ret) {
+               case LTTNG_OK:
+                       DBG("Successfully started rotation of session \"%s\" on behalf of trigger \"%s\"",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger));
+                       break;
+               case LTTNG_ERR_ROTATION_PENDING:
+                       DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%s\" but a rotation is already ongoing",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger));
+                       break;
+               case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
+               case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
+                       DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%s\" but a rotation has already been completed since the last stop or clear",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger));
+                       break;
+               default:
+                       WARN("Failed to start a rotation of session \"%s\" on behalf of trigger \"%s\": %s",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger),
+                                       lttng_strerror(-cmd_ret));
+                       break;
+               }
+               session_put(session);
+       } else {
+               DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
+                               session_name, get_action_name(action),
+                               get_trigger_name(work_item->trigger));
+       }
+       session_unlock_list();
+end:
+       return ret;
+}
+
+static int action_executor_snapshot_session_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       int ret = 0;
+       const char *session_name;
+       enum lttng_action_status action_status;
+       struct ltt_session *session;
+       const struct lttng_snapshot_output default_snapshot_output = {
+               .max_size = UINT64_MAX,
+       };
+       const struct lttng_snapshot_output *snapshot_output =
+                       &default_snapshot_output;
+
+       action_status = lttng_action_snapshot_session_get_session_name(
+                       action, &session_name);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               ERR("Failed to get session name from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       action_status = lttng_action_snapshot_session_get_output_const(
+                       action, &snapshot_output);
+       if (action_status != LTTNG_ACTION_STATUS_OK &&
+                       action_status != LTTNG_ACTION_STATUS_UNSET) {
+               ERR("Failed to get output from \"%s\" action",
+                               get_action_name(action));
+               ret = -1;
+               goto end;
+       }
+
+       session_lock_list();
+       session = session_find_by_name(session_name);
+       if (session) {
+               enum lttng_error_code cmd_ret;
+
+               session_lock(session);
+               cmd_ret = cmd_snapshot_record(session, snapshot_output, 0);
+               session_unlock(session);
+
+               switch (cmd_ret) {
+               case LTTNG_OK:
+                       DBG("Successfully recorded snapshot of session \"%s\" on behalf of trigger \"%s\"",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger));
+                       break;
+               default:
+                       WARN("Failed to record snapshot of session \"%s\" on behalf of trigger \"%s\": %s",
+                                       session_name,
+                                       get_trigger_name(work_item->trigger),
+                                       lttng_strerror(-cmd_ret));
+                       break;
+               }
+               session_put(session);
+       } else {
+               DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
+                               session_name, get_action_name(action),
+                               get_trigger_name(work_item->trigger));
+       }
+       session_unlock_list();
+end:
+       return ret;
+}
+
+static int action_executor_group_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action_group)
+{
+       int ret = 0;
+       unsigned int i, count;
+       enum lttng_action_status action_status;
+
+       action_status = lttng_action_group_get_count(action_group, &count);
+       if (action_status != LTTNG_ACTION_STATUS_OK) {
+               /* Fatal error. */
+               ERR("Failed to get count of action in action group");
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
+       for (i = 0; i < count; i++) {
+               const struct lttng_action *action =
+                               lttng_action_group_get_at_index_const(
+                                               action_group, i);
+
+               ret = action_executor_generic_handler(
+                               executor, work_item, action);
+               if (ret) {
+                       ERR("Stopping the execution of the action group of trigger \"%s\" following a fatal error",
+                                       get_trigger_name(work_item->trigger));
+                       goto end;
+               }
+       }
+end:
+       return ret;
+}
+
+static int action_executor_generic_handler(struct action_executor *executor,
+               const struct action_work_item *work_item,
+               const struct lttng_action *action)
+{
+       DBG("Executing action \"%s\" of trigger \"%s\" action work item %" PRIu64,
+                       get_action_name(action),
+                       get_trigger_name(work_item->trigger),
+                       work_item->id);
+
+       return action_executors[lttng_action_get_type(action)](
+                       executor, work_item, action);
+}
+
+static int action_work_item_execute(struct action_executor *executor,
+               struct action_work_item *work_item)
+{
+       int ret;
+       const struct lttng_action *action =
+                       lttng_trigger_get_const_action(work_item->trigger);
+
+       DBG("Starting execution of action work item %" PRIu64 " of trigger \"%s\"",
+                       work_item->id, get_trigger_name(work_item->trigger));
+       ret = action_executor_generic_handler(executor, work_item, action);
+       DBG("Completed execution of action work item %" PRIu64 " of trigger \"%s\"",
+                       work_item->id, get_trigger_name(work_item->trigger));
+       return ret;
+}
+
+static void action_work_item_destroy(struct action_work_item *work_item)
+{
+       lttng_trigger_put(work_item->trigger);
+       notification_client_list_put(work_item->client_list);
+       free(work_item);
+}
+
+static void *action_executor_thread(void *_data)
+{
+       struct action_executor *executor = _data;
+
+       assert(executor);
+
+       health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
+
+       rcu_register_thread();
+       rcu_thread_online();
+
+       DBG("Entering work execution loop");
+       pthread_mutex_lock(&executor->work.lock);
+       while (!executor->should_quit) {
+               int ret;
+               struct action_work_item *work_item;
+
+               health_code_update();
+               if (executor->work.pending_count == 0) {
+                       health_poll_entry();
+                       DBG("No work items enqueued, entering wait");
+                       pthread_cond_wait(&executor->work.cond,
+                                       &executor->work.lock);
+                       DBG("Woke-up from wait");
+                       health_poll_exit();
+                       continue;
+               }
+
+               /* Pop item from front of the listwith work lock held. */
+               work_item = cds_list_first_entry(&executor->work.list,
+                               struct action_work_item, list_node);
+               cds_list_del(&work_item->list_node);
+               executor->work.pending_count--;
+
+               /*
+                * Work can be performed without holding the work lock,
+                * allowing new items to be queued.
+                */
+               pthread_mutex_unlock(&executor->work.lock);
+               ret = action_work_item_execute(executor, work_item);
+               action_work_item_destroy(work_item);
+               if (ret) {
+                       /* Fatal error. */
+                       break;
+               }
+               health_code_update();
+               pthread_mutex_lock(&executor->work.lock);
+       }
+       pthread_mutex_unlock(&executor->work.lock);
+       DBG("Left work execution loop");
+
+       health_code_update();
+
+       rcu_thread_offline();
+       rcu_unregister_thread();
+       health_unregister(health_sessiond);
+
+       return NULL;
+}
+
+static bool shutdown_action_executor_thread(void *_data)
+{
+       struct action_executor *executor = _data;
+
+       /* TODO. */
+       executor->should_quit = true;
+       pthread_cond_signal(&executor->work.cond);
+       return true;
+}
+
+static void clean_up_action_executor_thread(void *_data)
+{
+       struct action_executor *executor = _data;
+
+       assert(cds_list_empty(&executor->work.list));
+
+       pthread_mutex_destroy(&executor->work.lock);
+       pthread_cond_destroy(&executor->work.cond);
+       free(executor);
+}
+
+struct action_executor *action_executor_create(
+               struct notification_thread_handle *handle)
+{
+       struct action_executor *executor = zmalloc(sizeof(*executor));
+
+       if (!executor) {
+               goto end;
+       }
+
+       CDS_INIT_LIST_HEAD(&executor->work.list);
+       pthread_cond_init(&executor->work.cond, NULL);
+       pthread_mutex_init(&executor->work.lock, NULL);
+       executor->notification_thread_handle = handle;
+
+       executor->thread = lttng_thread_create(THREAD_NAME,
+                       action_executor_thread, shutdown_action_executor_thread,
+                       clean_up_action_executor_thread, executor);
+end:
+       return executor;
+}
+
+void action_executor_destroy(struct action_executor *executor)
+{
+       struct action_work_item *work_item, *tmp;
+
+       /* TODO Wait for work list to drain? */
+       lttng_thread_shutdown(executor->thread);
+       pthread_mutex_lock(&executor->work.lock);
+       if (executor->work.pending_count != 0) {
+               WARN("%" PRIu64
+                       " trigger action%s still queued for execution and will be discarded",
+                               executor->work.pending_count,
+                               executor->work.pending_count == 1 ? " is" :
+                                                                   "s are");
+       }
+
+       cds_list_for_each_entry_safe (
+                       work_item, tmp, &executor->work.list, list_node) {
+               WARN("Discarding action work item %" PRIu64
+                               " associated to trigger \"%s\"",
+                               work_item->id, get_trigger_name(work_item->trigger));
+               cds_list_del(&work_item->list_node);
+               action_work_item_destroy(work_item);
+       }
+       pthread_mutex_unlock(&executor->work.lock);
+       lttng_thread_put(executor->thread);
+}
+
+/* RCU read-lock must be held by the caller. */
+enum action_executor_status action_executor_enqueue(
+               struct action_executor *executor,
+               struct lttng_trigger *trigger,
+               struct notification_client_list *client_list)
+{
+       enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
+       const uint64_t work_item_id = executor->next_work_item_id++;
+       struct action_work_item *work_item;
+       bool signal = false;
+
+       pthread_mutex_lock(&executor->work.lock);
+       /* Check for queue overflow. */
+       if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
+               /* Most likely spammy, remove if it is the case. */
+               DBG("Refusing to enqueue action for trigger \"%s\" as work item %" PRIu64
+                   " (overflow)",
+                               get_trigger_name(trigger), work_item_id);
+               executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
+               goto error_unlock;
+       }
+
+       work_item = zmalloc(sizeof(*work_item));
+       if (!work_item) {
+               PERROR("Failed to allocate action executor work item on behalf of trigger \"%s\"",
+                               get_trigger_name(trigger));
+               executor_status = ACTION_EXECUTOR_STATUS_ERROR;
+               goto error_unlock;
+       }
+
+       lttng_trigger_get(trigger);
+       if (client_list) {
+               const bool reference_acquired =
+                               notification_client_list_get(client_list);
+
+               assert(reference_acquired);
+       }
+
+       *work_item = (typeof(*work_item)){
+                       .id = work_item_id,
+                       .trigger = trigger,
+                       .client_list = client_list,
+                       .list_node = CDS_LIST_HEAD_INIT(work_item->list_node),
+       };
+       cds_list_add_tail(&work_item->list_node, &executor->work.list);
+       executor->work.pending_count++;
+       DBG("Enqueued action for trigger \"%s\" as work item %" PRIu64,
+                       get_trigger_name(trigger), work_item_id);
+       signal = true;
+
+error_unlock:
+       pthread_mutex_unlock(&executor->work.lock);
+       if (signal) {
+               pthread_cond_signal(&executor->work.cond);
+       }
+       return executor_status;
+}
This page took 0.029967 seconds and 5 git commands to generate.