X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Faction-executor.c;fp=src%2Fbin%2Flttng-sessiond%2Faction-executor.c;h=40686eb7b0b77aea4636ad003bf3fb9ccbd08174;hp=0000000000000000000000000000000000000000;hb=1831ae68b70dece8e9b847081526495adbbf05e5;hpb=25357057de5ae4dd2a572e8f9b893c1b90cbd60a diff --git a/src/bin/lttng-sessiond/action-executor.c b/src/bin/lttng-sessiond/action-executor.c new file mode 100644 index 000000000..40686eb7b --- /dev/null +++ b/src/bin/lttng-sessiond/action-executor.c @@ -0,0 +1,654 @@ +/* + * Copyright (C) 2020 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#include "action-executor.h" +#include "cmd.h" +#include "health-sessiond.h" +#include "lttng-sessiond.h" +#include "notification-thread-internal.h" +#include "session.h" +#include "thread.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define THREAD_NAME "Action Executor" +#define MAX_QUEUED_WORK_COUNT 8192 + +struct action_work_item { + uint64_t id; + struct lttng_trigger *trigger; + struct notification_client_list *client_list; + struct cds_list_head list_node; +}; + +struct action_executor { + struct lttng_thread *thread; + struct notification_thread_handle *notification_thread_handle; + struct { + uint64_t pending_count; + struct cds_list_head list; + pthread_cond_t cond; + pthread_mutex_t lock; + } work; + bool should_quit; + uint64_t next_work_item_id; +}; + +typedef int (*action_executor_handler)(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *action); + +static int action_executor_notify_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_start_session_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_stop_session_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_rotate_session_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_snapshot_session_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_group_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_generic_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); + +static const action_executor_handler action_executors[] = { + [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler, + [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler, + [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler, + [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler, + [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler, + [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler, +}; + +static const char *get_action_name(const struct lttng_action *action) +{ + const char *action_type_names[] = { + [LTTNG_ACTION_TYPE_NOTIFY] = "Notify", + [LTTNG_ACTION_TYPE_START_SESSION] = "Start session", + [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session", + [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session", + [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session", + [LTTNG_ACTION_TYPE_GROUP] = "Group", + }; + + return action_type_names[lttng_action_get_type(action)]; +} + +static const char *get_trigger_name(const struct lttng_trigger *trigger) +{ + const char *trigger_name; + enum lttng_trigger_status trigger_status; + + trigger_status = lttng_trigger_get_name(trigger, &trigger_name); + assert(trigger_status == LTTNG_TRIGGER_STATUS_OK); + + return trigger_name; +} + +static int client_handle_transmission_status( + struct notification_client *client, + enum client_transmission_status status, + void *user_data) +{ + struct action_executor *executor = user_data; + + switch (status) { + case CLIENT_TRANSMISSION_STATUS_COMPLETE: + DBG("Sent notification to client"); + break; + default: + ERR("Could not send notification to client"); + } + + return 0; +} + +static int action_executor_notify_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + struct lttng_evaluation *evaluation = NULL; + + assert(work_item->client_list); + + evaluation = lttng_evaluation_event_rule_create( + get_trigger_name(work_item->trigger)); + if (!evaluation) { + ERR("Failed to create event rule hit evaluation"); + ret = -1; + goto end; + } + + ret = notification_client_list_send_evaluation(work_item->client_list, + lttng_trigger_get_const_condition(work_item->trigger), + evaluation, + lttng_trigger_get_credentials(work_item->trigger), NULL, + client_handle_transmission_status, executor); +end: + lttng_evaluation_destroy(evaluation); + return ret; +} + +static int action_executor_start_session_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + const char *session_name; + enum lttng_action_status action_status; + struct ltt_session *session; + + action_status = lttng_action_start_session_get_session_name( + action, &session_name); + if (action_status != LTTNG_ACTION_STATUS_OK) { + ERR("Failed to get session name from \"%s\" action", + get_action_name(action)); + ret = -1; + goto end; + } + + session_lock_list(); + session = session_find_by_name(session_name); + if (session) { + enum lttng_error_code cmd_ret; + + session_lock(session); + cmd_ret = cmd_start_trace(session); + session_unlock(session); + + switch (cmd_ret) { + case LTTNG_OK: + DBG("Successfully started session \"%s\" on behalf of trigger \"%s\"", + session_name, + get_trigger_name(work_item->trigger)); + break; + case LTTNG_ERR_TRACE_ALREADY_STARTED: + DBG("Attempted to start session \"%s\" on behalf of trigger \"%s\" but it was already started", + session_name, + get_trigger_name(work_item->trigger)); + break; + default: + WARN("Failed to start session \"%s\" on behalf of trigger \"%s\": %s", + session_name, + get_trigger_name(work_item->trigger), + lttng_strerror(-cmd_ret)); + break; + } + session_put(session); + } else { + DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"", + session_name, get_action_name(action), + get_trigger_name(work_item->trigger)); + } + session_unlock_list(); +end: + return ret; +} + +static int action_executor_stop_session_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + const char *session_name; + enum lttng_action_status action_status; + struct ltt_session *session; + + action_status = lttng_action_stop_session_get_session_name( + action, &session_name); + if (action_status != LTTNG_ACTION_STATUS_OK) { + ERR("Failed to get session name from \"%s\" action", + get_action_name(action)); + ret = -1; + goto end; + } + + session_lock_list(); + session = session_find_by_name(session_name); + if (session) { + enum lttng_error_code cmd_ret; + + session_lock(session); + cmd_ret = cmd_stop_trace(session); + session_unlock(session); + + switch (cmd_ret) { + case LTTNG_OK: + DBG("Successfully stopped session \"%s\" on behalf of trigger \"%s\"", + session_name, + get_trigger_name(work_item->trigger)); + break; + case LTTNG_ERR_TRACE_ALREADY_STOPPED: + DBG("Attempted to stop session \"%s\" on behalf of trigger \"%s\" but it was already stopped", + session_name, + get_trigger_name(work_item->trigger)); + break; + default: + WARN("Failed to stop session \"%s\" on behalf of trigger \"%s\": %s", + session_name, + get_trigger_name(work_item->trigger), + lttng_strerror(-cmd_ret)); + break; + } + session_put(session); + } else { + DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"", + session_name, get_action_name(action), + get_trigger_name(work_item->trigger)); + } + session_unlock_list(); +end: + return ret; +} + +static int action_executor_rotate_session_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + const char *session_name; + enum lttng_action_status action_status; + struct ltt_session *session; + + action_status = lttng_action_rotate_session_get_session_name( + action, &session_name); + if (action_status != LTTNG_ACTION_STATUS_OK) { + ERR("Failed to get session name from \"%s\" action", + get_action_name(action)); + ret = -1; + goto end; + } + + session_lock_list(); + session = session_find_by_name(session_name); + if (session) { + enum lttng_error_code cmd_ret; + + session_lock(session); + cmd_ret = cmd_rotate_session(session, NULL, false, + LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + session_unlock(session); + + switch (cmd_ret) { + case LTTNG_OK: + DBG("Successfully started rotation of session \"%s\" on behalf of trigger \"%s\"", + session_name, + get_trigger_name(work_item->trigger)); + break; + case LTTNG_ERR_ROTATION_PENDING: + DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%s\" but a rotation is already ongoing", + session_name, + get_trigger_name(work_item->trigger)); + break; + case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP: + case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR: + DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%s\" but a rotation has already been completed since the last stop or clear", + session_name, + get_trigger_name(work_item->trigger)); + break; + default: + WARN("Failed to start a rotation of session \"%s\" on behalf of trigger \"%s\": %s", + session_name, + get_trigger_name(work_item->trigger), + lttng_strerror(-cmd_ret)); + break; + } + session_put(session); + } else { + DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"", + session_name, get_action_name(action), + get_trigger_name(work_item->trigger)); + } + session_unlock_list(); +end: + return ret; +} + +static int action_executor_snapshot_session_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + const char *session_name; + enum lttng_action_status action_status; + struct ltt_session *session; + const struct lttng_snapshot_output default_snapshot_output = { + .max_size = UINT64_MAX, + }; + const struct lttng_snapshot_output *snapshot_output = + &default_snapshot_output; + + action_status = lttng_action_snapshot_session_get_session_name( + action, &session_name); + if (action_status != LTTNG_ACTION_STATUS_OK) { + ERR("Failed to get session name from \"%s\" action", + get_action_name(action)); + ret = -1; + goto end; + } + + action_status = lttng_action_snapshot_session_get_output_const( + action, &snapshot_output); + if (action_status != LTTNG_ACTION_STATUS_OK && + action_status != LTTNG_ACTION_STATUS_UNSET) { + ERR("Failed to get output from \"%s\" action", + get_action_name(action)); + ret = -1; + goto end; + } + + session_lock_list(); + session = session_find_by_name(session_name); + if (session) { + enum lttng_error_code cmd_ret; + + session_lock(session); + cmd_ret = cmd_snapshot_record(session, snapshot_output, 0); + session_unlock(session); + + switch (cmd_ret) { + case LTTNG_OK: + DBG("Successfully recorded snapshot of session \"%s\" on behalf of trigger \"%s\"", + session_name, + get_trigger_name(work_item->trigger)); + break; + default: + WARN("Failed to record snapshot of session \"%s\" on behalf of trigger \"%s\": %s", + session_name, + get_trigger_name(work_item->trigger), + lttng_strerror(-cmd_ret)); + break; + } + session_put(session); + } else { + DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"", + session_name, get_action_name(action), + get_trigger_name(work_item->trigger)); + } + session_unlock_list(); +end: + return ret; +} + +static int action_executor_group_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action_group) +{ + int ret = 0; + unsigned int i, count; + enum lttng_action_status action_status; + + action_status = lttng_action_group_get_count(action_group, &count); + if (action_status != LTTNG_ACTION_STATUS_OK) { + /* Fatal error. */ + ERR("Failed to get count of action in action group"); + ret = -1; + goto end; + } + + DBG("Action group has %u action%s", count, count != 1 ? "s" : ""); + for (i = 0; i < count; i++) { + const struct lttng_action *action = + lttng_action_group_get_at_index_const( + action_group, i); + + ret = action_executor_generic_handler( + executor, work_item, action); + if (ret) { + ERR("Stopping the execution of the action group of trigger \"%s\" following a fatal error", + get_trigger_name(work_item->trigger)); + goto end; + } + } +end: + return ret; +} + +static int action_executor_generic_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + DBG("Executing action \"%s\" of trigger \"%s\" action work item %" PRIu64, + get_action_name(action), + get_trigger_name(work_item->trigger), + work_item->id); + + return action_executors[lttng_action_get_type(action)]( + executor, work_item, action); +} + +static int action_work_item_execute(struct action_executor *executor, + struct action_work_item *work_item) +{ + int ret; + const struct lttng_action *action = + lttng_trigger_get_const_action(work_item->trigger); + + DBG("Starting execution of action work item %" PRIu64 " of trigger \"%s\"", + work_item->id, get_trigger_name(work_item->trigger)); + ret = action_executor_generic_handler(executor, work_item, action); + DBG("Completed execution of action work item %" PRIu64 " of trigger \"%s\"", + work_item->id, get_trigger_name(work_item->trigger)); + return ret; +} + +static void action_work_item_destroy(struct action_work_item *work_item) +{ + lttng_trigger_put(work_item->trigger); + notification_client_list_put(work_item->client_list); + free(work_item); +} + +static void *action_executor_thread(void *_data) +{ + struct action_executor *executor = _data; + + assert(executor); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR); + + rcu_register_thread(); + rcu_thread_online(); + + DBG("Entering work execution loop"); + pthread_mutex_lock(&executor->work.lock); + while (!executor->should_quit) { + int ret; + struct action_work_item *work_item; + + health_code_update(); + if (executor->work.pending_count == 0) { + health_poll_entry(); + DBG("No work items enqueued, entering wait"); + pthread_cond_wait(&executor->work.cond, + &executor->work.lock); + DBG("Woke-up from wait"); + health_poll_exit(); + continue; + } + + /* Pop item from front of the listwith work lock held. */ + work_item = cds_list_first_entry(&executor->work.list, + struct action_work_item, list_node); + cds_list_del(&work_item->list_node); + executor->work.pending_count--; + + /* + * Work can be performed without holding the work lock, + * allowing new items to be queued. + */ + pthread_mutex_unlock(&executor->work.lock); + ret = action_work_item_execute(executor, work_item); + action_work_item_destroy(work_item); + if (ret) { + /* Fatal error. */ + break; + } + health_code_update(); + pthread_mutex_lock(&executor->work.lock); + } + pthread_mutex_unlock(&executor->work.lock); + DBG("Left work execution loop"); + + health_code_update(); + + rcu_thread_offline(); + rcu_unregister_thread(); + health_unregister(health_sessiond); + + return NULL; +} + +static bool shutdown_action_executor_thread(void *_data) +{ + struct action_executor *executor = _data; + + /* TODO. */ + executor->should_quit = true; + pthread_cond_signal(&executor->work.cond); + return true; +} + +static void clean_up_action_executor_thread(void *_data) +{ + struct action_executor *executor = _data; + + assert(cds_list_empty(&executor->work.list)); + + pthread_mutex_destroy(&executor->work.lock); + pthread_cond_destroy(&executor->work.cond); + free(executor); +} + +struct action_executor *action_executor_create( + struct notification_thread_handle *handle) +{ + struct action_executor *executor = zmalloc(sizeof(*executor)); + + if (!executor) { + goto end; + } + + CDS_INIT_LIST_HEAD(&executor->work.list); + pthread_cond_init(&executor->work.cond, NULL); + pthread_mutex_init(&executor->work.lock, NULL); + executor->notification_thread_handle = handle; + + executor->thread = lttng_thread_create(THREAD_NAME, + action_executor_thread, shutdown_action_executor_thread, + clean_up_action_executor_thread, executor); +end: + return executor; +} + +void action_executor_destroy(struct action_executor *executor) +{ + struct action_work_item *work_item, *tmp; + + /* TODO Wait for work list to drain? */ + lttng_thread_shutdown(executor->thread); + pthread_mutex_lock(&executor->work.lock); + if (executor->work.pending_count != 0) { + WARN("%" PRIu64 + " trigger action%s still queued for execution and will be discarded", + executor->work.pending_count, + executor->work.pending_count == 1 ? " is" : + "s are"); + } + + cds_list_for_each_entry_safe ( + work_item, tmp, &executor->work.list, list_node) { + WARN("Discarding action work item %" PRIu64 + " associated to trigger \"%s\"", + work_item->id, get_trigger_name(work_item->trigger)); + cds_list_del(&work_item->list_node); + action_work_item_destroy(work_item); + } + pthread_mutex_unlock(&executor->work.lock); + lttng_thread_put(executor->thread); +} + +/* RCU read-lock must be held by the caller. */ +enum action_executor_status action_executor_enqueue( + struct action_executor *executor, + struct lttng_trigger *trigger, + struct notification_client_list *client_list) +{ + enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK; + const uint64_t work_item_id = executor->next_work_item_id++; + struct action_work_item *work_item; + bool signal = false; + + pthread_mutex_lock(&executor->work.lock); + /* Check for queue overflow. */ + if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) { + /* Most likely spammy, remove if it is the case. */ + DBG("Refusing to enqueue action for trigger \"%s\" as work item %" PRIu64 + " (overflow)", + get_trigger_name(trigger), work_item_id); + executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW; + goto error_unlock; + } + + work_item = zmalloc(sizeof(*work_item)); + if (!work_item) { + PERROR("Failed to allocate action executor work item on behalf of trigger \"%s\"", + get_trigger_name(trigger)); + executor_status = ACTION_EXECUTOR_STATUS_ERROR; + goto error_unlock; + } + + lttng_trigger_get(trigger); + if (client_list) { + const bool reference_acquired = + notification_client_list_get(client_list); + + assert(reference_acquired); + } + + *work_item = (typeof(*work_item)){ + .id = work_item_id, + .trigger = trigger, + .client_list = client_list, + .list_node = CDS_LIST_HEAD_INIT(work_item->list_node), + }; + cds_list_add_tail(&work_item->list_node, &executor->work.list); + executor->work.pending_count++; + DBG("Enqueued action for trigger \"%s\" as work item %" PRIu64, + get_trigger_name(trigger), work_item_id); + signal = true; + +error_unlock: + pthread_mutex_unlock(&executor->work.lock); + if (signal) { + pthread_cond_signal(&executor->work.cond); + } + return executor_status; +}