From c254fb11e36408b13c3e5dc51e5dbefa742dbe57 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 3 Mar 2017 11:05:58 -0500 Subject: [PATCH] Simulate buffer usage in notification thread MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Galarneau --- include/Makefile.am | 3 +- include/lttng/condition/condition-internal.h | 6 + include/lttng/condition/evaluation-internal.h | 5 + include/lttng/lttng-error.h | 1 + include/lttng/notification/channel.h | 11 +- include/lttng/trigger/trigger-internal.h | 3 + src/bin/lttng-sessiond/cmd.c | 20 +- src/bin/lttng-sessiond/cmd.h | 5 +- src/bin/lttng-sessiond/main.c | 3 +- src/bin/lttng-sessiond/notification-thread.c | 563 +++++++++++++++++- src/bin/lttng-sessiond/notification-thread.h | 9 +- src/common/buffer-usage.c | 2 +- src/common/condition.c | 8 + src/common/error.c | 1 + src/common/trigger.c | 1 + src/lib/lttng-ctl/Makefile.am | 3 +- 16 files changed, 624 insertions(+), 20 deletions(-) diff --git a/include/Makefile.am b/include/Makefile.am index da9dc06b5..bc105b064 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -108,4 +108,5 @@ noinst_HEADERS = \ lttng/condition/evaluation-internal.h \ lttng/notification/notification-internal.h \ lttng/trigger/trigger-internal.h \ - lttng/endpoint-internal.h + lttng/endpoint-internal.h \ + lttng/notification/channel-internal.h diff --git a/include/lttng/condition/condition-internal.h b/include/lttng/condition/condition-internal.h index 1c8f100e9..652b4369c 100644 --- a/include/lttng/condition/condition-internal.h +++ b/include/lttng/condition/condition-internal.h @@ -21,6 +21,7 @@ #include #include #include +#include typedef void (*condition_destroy_cb)(struct lttng_condition *condition); typedef bool (*condition_validate_cb)(struct lttng_condition *condition); @@ -32,6 +33,7 @@ struct lttng_condition { condition_validate_cb validate; condition_serialize_cb serialize; condition_destroy_cb destroy; + struct cds_list_head list_node; }; struct lttng_condition_comm { @@ -39,6 +41,10 @@ struct lttng_condition_comm { int8_t condition_type; }; +LTTNG_HIDDEN +void lttng_condition_init(struct lttng_condition *condition, + enum lttng_condition_type type); + LTTNG_HIDDEN bool lttng_condition_validate(struct lttng_condition *condition); diff --git a/include/lttng/condition/evaluation-internal.h b/include/lttng/condition/evaluation-internal.h index c17049f1c..e73411d5c 100644 --- a/include/lttng/condition/evaluation-internal.h +++ b/include/lttng/condition/evaluation-internal.h @@ -24,6 +24,11 @@ typedef void (*evaluation_destroy_cb)(struct lttng_evaluation *evaluation); +struct lttng_evaluation_comm { + /* enum lttng_condition_type type */ + int8_t type; +} LTTNG_PACKED; + struct lttng_evaluation { enum lttng_condition_type type; evaluation_destroy_cb destroy; diff --git a/include/lttng/lttng-error.h b/include/lttng/lttng-error.h index 69a82a643..8fdf1a35d 100644 --- a/include/lttng/lttng-error.h +++ b/include/lttng/lttng-error.h @@ -146,6 +146,7 @@ enum lttng_error_code { LTTNG_ERR_REGEN_STATEDUMP_NOMEM = 123, /* Failed to regenerate the state dump, not enough memory */ LTTNG_ERR_NOT_SNAPSHOT_SESSION = 124, /* Session is not in snapshot mode. */ LTTNG_ERR_INVALID_TRIGGER = 125, /* Invalid trigger provided. */ + LTTNG_ERR_COMMAND_CANCELLED = 126, /* Command cancelled. */ /* MUST be last element */ LTTNG_ERR_NR, /* Last element */ diff --git a/include/lttng/notification/channel.h b/include/lttng/notification/channel.h index 77a9e6e92..b1c2d6af5 100644 --- a/include/lttng/notification/channel.h +++ b/include/lttng/notification/channel.h @@ -23,21 +23,22 @@ extern "C" { #endif struct lttng_endpoint; +struct lttng_condition; struct lttng_notification; struct lttng_notification_channel; -struct lttng_notification_channel_endpoint; /* LTTng Notification channel */ enum lttng_notification_channel_status { LTTNG_NOTIFICATION_CHANNEL_STATUS_TIMEOUT = 2, LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED = 1, LTTNG_NOTIFICATION_CHANNEL_STATUS_OK = 0, - LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED = -1, - LTTNG_NOTIFICATION_CHANNEL_STATUS_NOT_FOUND = -2, - LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED = -3, + LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR = -1, + LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED = -2, + LTTNG_NOTIFICATION_CHANNEL_STATUS_NOT_FOUND = -3, + LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED = -4, + LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID = -5, }; -/* TODO Add target parameter (sessiond, relayd, etc.). */ extern struct lttng_notification_channel *lttng_notification_channel_create( struct lttng_endpoint *endpoint); diff --git a/include/lttng/trigger/trigger-internal.h b/include/lttng/trigger/trigger-internal.h index eab0de7ab..4d90fe84f 100644 --- a/include/lttng/trigger/trigger-internal.h +++ b/include/lttng/trigger/trigger-internal.h @@ -22,10 +22,13 @@ #include #include #include +#include +#include struct lttng_trigger { struct lttng_condition *condition; struct lttng_action *action; + struct cds_list_head list_node; }; struct lttng_trigger_comm { diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index 3f2de9e7e..a3c680aff 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -42,6 +42,7 @@ #include "syscall.h" #include "agent.h" #include "buffer-registry.h" +#include "notification-thread.h" #include "cmd.h" @@ -3567,13 +3568,15 @@ end: return ret; } -int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock) +int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_data *notification_thread) { int ret; size_t trigger_len; ssize_t sock_recv_len; char *trigger_buffer = NULL; struct lttng_trigger *trigger = NULL; + uint64_t notification_counter = 1; trigger_len = (size_t) cmd_ctx->lsm->u.trigger.header.len; trigger_buffer = zmalloc(trigger_len); @@ -3595,11 +3598,22 @@ int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock) ret = LTTNG_ERR_INVALID_TRIGGER; goto end; } + + /* Enqueue and signal. */ + pthread_mutex_lock(¬ification_thread->cmd_queue.lock); + cds_list_add(&trigger->list_node, ¬ification_thread->cmd_queue.list); + trigger = NULL; + pthread_mutex_unlock(¬ification_thread->cmd_queue.lock); + ret = write(notification_thread->cmd_queue.event_fd, + ¬ification_counter, sizeof(notification_counter)); + if (ret < 0) { + PERROR("write to notification thread's queue event fd"); + } else { + ret = LTTNG_OK; + } DBG("Command register trigger succeeded"); - ret = LTTNG_OK; end: free(trigger_buffer); - lttng_trigger_destroy(trigger); return ret; } diff --git a/src/bin/lttng-sessiond/cmd.h b/src/bin/lttng-sessiond/cmd.h index 3be9adba7..567ec078c 100644 --- a/src/bin/lttng-sessiond/cmd.h +++ b/src/bin/lttng-sessiond/cmd.h @@ -21,6 +21,8 @@ #include "context.h" #include "session.h" +struct notification_thread_data; + /* * Init the command subsystem. Must be called before using any of the functions * above. This is called in the main() of the session daemon. @@ -111,6 +113,7 @@ int cmd_set_session_shm_path(struct ltt_session *session, int cmd_regenerate_metadata(struct ltt_session *session); int cmd_regenerate_statedump(struct ltt_session *session); -int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock); +int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_data *notification_thread); #endif /* CMD_H */ diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index db3f8851b..935a18c99 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -4130,7 +4130,8 @@ error_add_context: } case LTTNG_REGISTER_TRIGGER: { - ret = cmd_register_trigger(cmd_ctx, sock); + ret = cmd_register_trigger(cmd_ctx, sock, + notification_thread_data); break; } default: diff --git a/src/bin/lttng-sessiond/notification-thread.c b/src/bin/lttng-sessiond/notification-thread.c index 5ec27b614..9e112fdb2 100644 --- a/src/bin/lttng-sessiond/notification-thread.c +++ b/src/bin/lttng-sessiond/notification-thread.c @@ -17,17 +17,56 @@ #define _LGPL_SOURCE #include +#include +#include +#include +#include #include #include #include #include +#include +#include #include #include +#include +#include #include "notification-thread.h" #include "lttng-sessiond.h" #include "health-sessiond.h" +#include + +#define CLIENT_RECEPTION_BUFFER_SIZE (4 * PAGE_SIZE) +#define SIMULATION_TIMER_INTERVAL_NS 2 * NSEC_PER_SEC +#define SIMULATION_TIMER_SIGNAL SIGRTMIN + 10 + +static int simulation_timer_event_fd = -1; +static timer_t simulation_timer; + +struct client { + int socket; + struct cds_list_head list_node; + /* + * Conditions to which the client is registered. + */ + struct cds_list_head condition_list; +}; + +static struct cds_list_head client_list; +static struct cds_list_head trigger_list; +static char *client_reception_buffer; + +/* + * The simulation timer will alternate between "buffers" between full and + * empty values, firing all low/high usage triggers in alternance. + */ +static pthread_mutex_t simulation_lock = PTHREAD_MUTEX_INITIALIZER; +static uint64_t simulation_buffer_use_bytes; +static double simulation_buffer_use_ratio = 0.0; +static uint64_t simulation_buffer_capacity = UINT32_MAX; + /* * Destroy the thread data previously created by the init function. */ @@ -47,6 +86,7 @@ void notification_destroy_data(struct notification_thread_data *data) PERROR("close notification command queue event_fd"); } + pthread_mutex_destroy(&data->cmd_queue.lock); /* TODO: purge queue and mark commands as cancelled. */ end: free(data); @@ -58,6 +98,7 @@ end: */ struct notification_thread_data *notification_init_data(void) { + int ret; struct notification_thread_data *data; data = zmalloc(sizeof(*data)); @@ -70,7 +111,11 @@ struct notification_thread_data *notification_init_data(void) PERROR("eventfd notification command queue"); goto error; } - cds_wfcq_init(&data->cmd_queue.head, &data->cmd_queue.tail); + CDS_INIT_LIST_HEAD(&data->cmd_queue.list); + ret = pthread_mutex_init(&data->cmd_queue.lock, NULL); + if (ret) { + goto error; + } end: return data; error: @@ -78,6 +123,84 @@ error: return NULL; } +static +void simulation_timer_thread(union sigval val) +{ + int ret; + uint64_t counter = 1; + + pthread_mutex_lock(&simulation_lock); + if (simulation_buffer_use_bytes == 0) { + simulation_buffer_use_bytes = UINT32_MAX; + simulation_buffer_use_ratio = 1.0; + } else { + simulation_buffer_use_bytes = 0; + simulation_buffer_use_ratio = 0.0; + } + pthread_mutex_unlock(&simulation_lock); + ret = write(simulation_timer_event_fd, &counter, sizeof(counter)); + if (ret < 0) { + PERROR("writer simulation timer event fd"); + } +} + +static +int simulation_timer_start(void) +{ + int ret; + struct sigevent sev; + struct itimerspec its; + + ret = eventfd(0, EFD_CLOEXEC); + if (ret < 0) { + PERROR("eventfd simulation timer event fd"); + goto error; + } + simulation_timer_event_fd = ret; + + sev.sigev_notify = SIGEV_THREAD; + sev.sigev_value.sival_ptr = NULL; + sev.sigev_notify_function = simulation_timer_thread; + sev.sigev_notify_attributes = NULL; + + /* + * Valgrind indicates a leak when timer_create() is used + * in the "SIGEV_THREAD" mode. This bug has been known to upstream glibc + * since 2009, but no fix has been implemented so far. + */ + ret = timer_create(CLOCK_MONOTONIC, &sev, &simulation_timer); + if (ret < 0) { + PERROR("timer_create simulation timer"); + goto error; + } + + its.it_value.tv_sec = SIMULATION_TIMER_INTERVAL_NS / NSEC_PER_SEC; + its.it_value.tv_nsec = (SIMULATION_TIMER_INTERVAL_NS % NSEC_PER_SEC); + its.it_interval.tv_sec = its.it_value.tv_sec; + its.it_interval.tv_nsec = its.it_value.tv_nsec; + + ret = timer_settime(simulation_timer, 0, &its, NULL); + if (ret < 0) { + PERROR("timer_settime simulation timer"); + goto error; + } + + return 0; +error: + return -1; +} + +static +void simulation_timer_stop(void) +{ + int ret; + + ret = timer_delete(simulation_timer); + if (ret == -1) { + PERROR("timer_delete simulation timer"); + } +} + static char *get_notification_channel_sock_path(void) { @@ -103,7 +226,6 @@ char *get_notification_channel_sock_path(void) ERR("Can't get HOME directory for socket creation"); goto error; } - puts(home_path); ret = snprintf(sock_path, LTTNG_PATH_MAX, DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK, @@ -176,6 +298,332 @@ error: return ret; } +static +int handle_new_connection(int socket) +{ + int ret; + struct client *client; + + DBG("[notification-thread] Handling new notification channel client connection"); + + client = zmalloc(sizeof(*client)); + if (!client) { + goto error; + } + + ret = lttcomm_accept_unix_sock(socket); + if (ret < 0) { + ERR("[notification-thread] Failed to accept new notification channel client connection"); + goto error; + } + + client->socket = ret; + CDS_INIT_LIST_HEAD(&client->condition_list); + + /* FIXME handle creds. */ + ret = lttcomm_setsockopt_creds_unix_sock(socket); + if (ret < 0) { + ERR("[notification-thread] Failed to set socket options on new notification channel client socket"); + goto error; + } + + cds_list_add(&client->list_node, &client_list); + return client->socket; +error: + free(client); + return -1; +} + +static +int send_command_reply(int socket, + enum lttng_notification_channel_status status) +{ + ssize_t ret; + struct lttng_notification_channel_command_reply reply = { + .status = (int8_t) status, + }; + + DBG("[notification-thread] Send command reply (%i)", (int) status); + + ret = lttcomm_send_unix_sock(socket, &reply, sizeof(reply)); + if (ret < 0) { + ERR("[notification-thread] Failed to send command reply"); + goto error; + } + return 0; +error: + return -1; +} + +static +struct client *get_client_from_fd(int fd) +{ + struct client *client; + + cds_list_for_each_entry(client, &client_list, list_node) { + if (client->socket == fd) { + return client; + } + } + return NULL; +} + +static +int handle_notification_channel_client(int socket) +{ + ssize_t ret; + size_t received = 0; + struct client *client = get_client_from_fd(socket); + struct lttng_condition *condition; + struct lttng_notification_channel_command command; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + struct lttng_trigger *trigger; + + assert(client); + + /* Receive command header. */ + do + { + ret = lttcomm_recv_unix_sock(socket, ((char *) &command) + received, + sizeof(command) - received); + if (ret <= 0) { + ERR("[notification-thread] Failed to receive channel command from client (received %zu bytes)", received); + goto error_no_reply; + } + received += ret; + } while (received < sizeof(command)); + + received = 0; + if (command.size >= CLIENT_RECEPTION_BUFFER_SIZE) { + ERR("[notification-thread] Notification channel client attempted to send condition larger (%u bytes) than client reception buffer (%u bytes)", + command.size, + (unsigned int) CLIENT_RECEPTION_BUFFER_SIZE); + goto error_no_reply; + } + + do + { + ret = lttcomm_recv_unix_sock(socket, + client_reception_buffer + received, + command.size - received); + if (ret <= 0) { + ERR("[notification-thread] Failed to receive condition from client"); + goto error_no_reply; + } + received += ret; + } while (received < sizeof(command)); + + ret = lttng_condition_create_from_buffer(client_reception_buffer, + &condition); + if (ret < 0 || ret < command.size) { + ERR("[notification-thread] Malformed condition received from client"); + goto error_no_reply; + } + + DBG("[notification-thread] Successfully received condition from notification channel client"); + + /* + * A client may only listen for a condition that is currently associated + * with a trigger known to the system. + */ + DBG("[notification-thread] Comparing registered condition to known trigger conditions"); + cds_list_for_each_entry(trigger, &trigger_list, list_node) { + struct lttng_condition *trigger_condition = + lttng_trigger_get_condition(trigger); + + if (!trigger_condition) { + ERR("[notification-thread] lttng_trigger_get_condition returned NULL"); + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + if (lttng_condition_is_equal(trigger_condition, condition)) { + /* Matching condition found. */ + DBG("[notification-thread] Found a matching condition, accepting client subscription request"); + cds_list_add(&condition->list_node, + &client->condition_list); + goto end; + } + } + + /* No match found, refuse the subscription. */ + DBG("[notification-thread] No matching condition found, refusing client subscription request"); + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN; +end: + if (send_command_reply(socket, status)) { + goto error_no_reply; + } + return 0; +error_no_reply: + return -1; +} + +static +void client_destroy(struct client *client) +{ + struct lttng_condition *condition, *tmp; + + cds_list_for_each_entry_safe(condition, tmp, &client->condition_list, + list_node) { + cds_list_del(&condition->list_node); + lttng_condition_destroy(condition); + } + + (void) lttcomm_close_unix_sock(client->socket); + free(client); +} + +static +void clean_up_notification_channel_client(int socket) +{ + struct client *client; + + DBG("[notification-thread] Searching for client data for clean-up"); + cds_list_for_each_entry(client, &client_list, list_node) { + if (client->socket == socket) { + DBG("[notification-thread] Client data found for clean-up"); + cds_list_del(&client->list_node); + client_destroy(client); + return; + } + } + ERR("[notification-thread] Failed to clean-up client data"); +} + +static +void activate_triggers(struct cds_list_head *new_triggers_list) +{ + struct lttng_trigger *trigger, *tmp; + + DBG("[notification-thread] Moving triggers from new list to activated trigger set"); + cds_list_for_each_entry_safe(trigger, tmp, new_triggers_list, list_node) { + cds_list_del(&trigger->list_node); + cds_list_add(&trigger->list_node, &trigger_list); + } +} + +static +void clean_up_triggers(void) +{ + struct lttng_trigger *trigger, *tmp; + + DBG("[notification-thread] Cleaning up triggers"); + cds_list_for_each_entry_safe(trigger, tmp, &trigger_list, list_node) { + DBG("[notification-thread] Destroying trigger"); + cds_list_del(&trigger->list_node); + lttng_trigger_destroy(trigger); + } +} + +static +struct lttng_evaluation *evaluate_buffer_usage_condition( + struct lttng_condition *_condition) +{ + uint64_t threshold; + struct lttng_evaluation *evaluation = NULL; + struct lttng_condition_buffer_usage *condition = container_of( + _condition, struct lttng_condition_buffer_usage, + parent); + + if (condition->threshold_bytes.set) { + threshold = condition->threshold_bytes.value; + } else { + /* Threshold was expressed as a ratio. */ + threshold = (uint64_t) (condition->threshold_ratio.value * + (double) simulation_buffer_capacity); + } + + if (condition->parent.type == + LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) { + if (simulation_buffer_use_bytes <= threshold) { + evaluation = lttng_evaluation_buffer_usage_create( + condition->parent.type, + simulation_buffer_use_bytes, + simulation_buffer_capacity); + } + } else { + if (simulation_buffer_use_bytes >= threshold) { + evaluation = lttng_evaluation_buffer_usage_create( + condition->parent.type, + simulation_buffer_use_bytes, + simulation_buffer_capacity); + } + } + return evaluation; +} + +static +void notify_client(struct client *client, struct lttng_condition *condition, + struct lttng_evaluation *evaluation) +{ + ssize_t notification_size, ret; + char *notification_buffer; + struct lttng_notification *notification; + + notification = lttng_notification_create(condition, evaluation); + if (!notification) { + ERR("[notification-thread] Failed to create client notification"); + return; + } + + notification_size = lttng_notification_serialize(notification, NULL); + if (notification_size < 0) { + ERR("[notification-thread] Failed to get size of serialized notification"); + return; + } + + notification_buffer = zmalloc(notification_size); + if (!notification_buffer) { + ERR("[notification-thread] Failed to allocate notification serialization buffer"); + } + + ret = lttng_notification_serialize(notification, notification_buffer); + if (ret != notification_size) { + ERR("[notification-thread] Failed to serialize notification"); + return; + } + + ret = lttcomm_send_unix_sock(client->socket, notification_buffer, + notification_size); + if (ret < 0) { + ERR("[notification-thread] Failed to send notification to client"); + return; + } +} + +static +void evaluate_client_conditions(void) +{ + struct client *client; + + DBG("[notification-thread] Evaluating client conditions"); + cds_list_for_each_entry(client, &client_list, list_node) { + struct lttng_condition *condition; + cds_list_for_each_entry(condition, &client->condition_list, + list_node) { + struct lttng_evaluation *evaluation = NULL; + switch (lttng_condition_get_type(condition)) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + evaluation = evaluate_buffer_usage_condition( + condition); + break; + default: + ERR("[notification-thread] Unknown condition type encountered in evaluation"); + abort(); + } + + if (evaluation) { + DBG("[notification-thread] Condition evaluated to true"); + notify_client(client, condition, evaluation); + lttng_evaluation_destroy(evaluation); + } + } + } + DBG("[notification-thread] Client conditions evaluated"); +} + /* * This thread services notification channel clients and received notifications * from various lttng-sessiond components over a command queue. @@ -189,6 +637,11 @@ void *thread_notification(void *data) DBG("[notification-thread] Started notification thread"); + CDS_INIT_LIST_HEAD(&client_list); + CDS_INIT_LIST_HEAD(&trigger_list); + + simulation_timer_start(); + if (!ctx) { ERR("[notification-thread] Invalid thread context provided"); goto end; @@ -200,7 +653,17 @@ void *thread_notification(void *data) health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION); health_code_update(); - notification_channel_socket = notification_channel_socket_create(); + client_reception_buffer = zmalloc(CLIENT_RECEPTION_BUFFER_SIZE); + if (!client_reception_buffer) { + ERR("[notification-thread] Failed to allocate client reception buffer"); + goto end; + } + + ret = notification_channel_socket_create(); + if (ret < 0) { + goto end; + } + notification_channel_socket = ret; /* * Create pollset with size 2, quit pipe and notification channel @@ -226,6 +689,13 @@ void *thread_notification(void *data) goto error; } + ret = lttng_poll_add(&events, simulation_timer_event_fd, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add timer event fd to pollset"); + goto error; + } + DBG("[notification-thread] Listening on notification channel socket"); ret = lttcomm_listen_unix_sock(notification_channel_socket); if (ret < 0) { @@ -257,11 +727,95 @@ void *thread_notification(void *data) int fd = LTTNG_POLL_GETFD(&events, i); uint32_t revents = LTTNG_POLL_GETEV(&events, i); + DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents); + /* Thread quit pipe has been closed. Killing thread. */ if (sessiond_check_thread_quit_pipe(fd, revents)) { DBG("[notification-thread] Quit pipe signaled, exiting."); goto exit; } + + if (fd == notification_channel_socket) { + if (revents & LPOLLIN) { + int new_socket; + + ret = handle_new_connection( + notification_channel_socket); + if (ret < 0) { + continue; + } + new_socket = ret; + + ret = lttng_poll_add(&events, new_socket, + LPOLLIN | LPOLLERR | + LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { + ERR("[notification-thread] Failed to add notification channel client socket to pollset"); + goto error; + } + DBG("[notification-thread] Added new notification channel client socket to poll set"); + } else if (revents & + (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("[notification-thread] Notification socket poll error"); + goto error; + } else { + ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd); + goto error; + } + } else if (fd == ctx->cmd_queue.event_fd) { + /* + * Handling of internaly-generated events to + * evaluate against the set of active + * conditions. + */ + uint64_t counter; + + DBG("[notification-thread] Event received on command queue event fd"); + ret = read(fd, &counter, sizeof(counter)); + if (ret < 0) { + ERR("read on command queue event fd"); + } + + pthread_mutex_lock(&ctx->cmd_queue.lock); + activate_triggers(&ctx->cmd_queue.list); + pthread_mutex_unlock(&ctx->cmd_queue.lock); + } else if (fd == simulation_timer_event_fd) { + /* + * Place-holder timer to simulate activity in + * the system. + */ + uint64_t counter; + + DBG("[notification-thread] Simulation timer fired"); + ret = read(fd, &counter, sizeof(counter)); + if (ret < 0) { + ERR("read on simulation timer event fd"); + } + + pthread_mutex_lock(&simulation_lock); + evaluate_client_conditions(); + pthread_mutex_unlock(&simulation_lock); + } else { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* + * It doesn't matter if a command was + * pending on the client socket at this + * point since it now has now way to + * receive the notifications to which + * it was subscribing or unsubscribing. + */ + DBG("[notification-thread] Closing client connection (fd = %i)", fd); + clean_up_notification_channel_client(fd); + } else if (revents & LPOLLIN) { + ret = handle_notification_channel_client(fd); + if (ret) { + DBG("[notification-thread] Closing client connection following error"); + clean_up_notification_channel_client(fd); + } + } else { + DBG("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd); + } + } } } exit: @@ -273,6 +827,9 @@ error_poll_create: health_unregister(health_sessiond); rcu_thread_offline(); rcu_unregister_thread(); + free(client_reception_buffer); + clean_up_triggers(); end: + simulation_timer_stop(); return NULL; } diff --git a/src/bin/lttng-sessiond/notification-thread.h b/src/bin/lttng-sessiond/notification-thread.h index 7f797e28e..f5dcd3b54 100644 --- a/src/bin/lttng-sessiond/notification-thread.h +++ b/src/bin/lttng-sessiond/notification-thread.h @@ -20,6 +20,7 @@ #include #include +#include enum notification_command_type { NOTIFICATION_COMMAND_TYPE_NEW_TRIGGER, @@ -47,8 +48,8 @@ struct notification_thread_data { */ struct notification_cmd_queue { int event_fd; - struct cds_wfcq_head head; - struct cds_wfcq_tail tail; + struct cds_list_head list; + pthread_mutex_t lock; } cmd_queue; }; @@ -59,7 +60,7 @@ struct notification_command *notification_new_trigger_command_destroy( void *thread_notification(void *data); -int notification_thread_init_data(struct notification_thread_data **data); -void notification_thread_destroy_data(struct notification_thread_data *data); +struct notification_thread_data *notification_init_data(void); +void notification_destroy_data(struct notification_thread_data *data); #endif /* NOTIFICATION_THREAD_H */ diff --git a/src/common/buffer-usage.c b/src/common/buffer-usage.c index 697967d8c..66449c313 100644 --- a/src/common/buffer-usage.c +++ b/src/common/buffer-usage.c @@ -140,7 +140,7 @@ struct lttng_condition *lttng_condition_buffer_usage_create( goto end; } - condition->parent.type = type; + lttng_condition_init(&condition->parent, type); condition->parent.validate = lttng_condition_buffer_usage_validate; condition->parent.serialize = lttng_condition_buffer_usage_serialize; condition->parent.destroy = lttng_condition_buffer_usage_destroy; diff --git a/src/common/condition.c b/src/common/condition.c index 5c3c11c23..4a24202fe 100644 --- a/src/common/condition.c +++ b/src/common/condition.c @@ -131,3 +131,11 @@ ssize_t lttng_condition_create_from_buffer(const char *buf, end: return ret; } + +LTTNG_HIDDEN +void lttng_condition_init(struct lttng_condition *condition, + enum lttng_condition_type type) +{ + condition->type = type; + CDS_INIT_LIST_HEAD(&condition->list_node); +} diff --git a/src/common/error.c b/src/common/error.c index 018fd0901..f7b2e1e37 100644 --- a/src/common/error.c +++ b/src/common/error.c @@ -187,6 +187,7 @@ static const char *error_string_array[] = { [ ERROR_INDEX(LTTNG_ERR_REGEN_STATEDUMP_NOMEM) ] = "Failed to regenerate the state dump, not enough memory", [ ERROR_INDEX(LTTNG_ERR_NOT_SNAPSHOT_SESSION) ] = "Snapshot command can't be applied to a non-snapshot session", [ ERROR_INDEX(LTTNG_ERR_INVALID_TRIGGER) ] = "Invalid trigger", + [ ERROR_INDEX(LTTNG_ERR_COMMAND_CANCELLED) ] = "Command cancelled", /* Last element */ [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code" diff --git a/src/common/trigger.c b/src/common/trigger.c index 12a7329f4..ede1ac567 100644 --- a/src/common/trigger.c +++ b/src/common/trigger.c @@ -54,6 +54,7 @@ struct lttng_trigger *lttng_trigger_create( trigger->condition = condition; trigger->action = action; + CDS_INIT_LIST_HEAD(&trigger->list_node); end: return trigger; } diff --git a/src/lib/lttng-ctl/Makefile.am b/src/lib/lttng-ctl/Makefile.am index 6b6a6eed1..b5156caf9 100644 --- a/src/lib/lttng-ctl/Makefile.am +++ b/src/lib/lttng-ctl/Makefile.am @@ -5,7 +5,8 @@ SUBDIRS = filter lib_LTLIBRARIES = liblttng-ctl.la liblttng_ctl_la_SOURCES = lttng-ctl.c snapshot.c lttng-ctl-helper.h \ - lttng-ctl-health.c save.c load.c deprecated-symbols.c + lttng-ctl-health.c save.c load.c deprecated-symbols.c \ + channel.c liblttng_ctl_la_LDFLAGS = \ $(LT_NO_UNDEFINED) -- 2.34.1