Simulate buffer usage in notification thread
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 3 Mar 2017 16:05:58 +0000 (11:05 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 9 Mar 2017 03:32:37 +0000 (22:32 -0500)
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
16 files changed:
include/Makefile.am
include/lttng/condition/condition-internal.h
include/lttng/condition/evaluation-internal.h
include/lttng/lttng-error.h
include/lttng/notification/channel.h
include/lttng/trigger/trigger-internal.h
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/cmd.h
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/notification-thread.c
src/bin/lttng-sessiond/notification-thread.h
src/common/buffer-usage.c
src/common/condition.c
src/common/error.c
src/common/trigger.c
src/lib/lttng-ctl/Makefile.am

index da9dc06b5ae7771ba5a75243438855b684602af1..bc105b0649c482d0959aa35325c8cdc8909f3733 100644 (file)
@@ -108,4 +108,5 @@ noinst_HEADERS = \
        lttng/condition/evaluation-internal.h \
        lttng/notification/notification-internal.h \
        lttng/trigger/trigger-internal.h \
-       lttng/endpoint-internal.h
+       lttng/endpoint-internal.h \
+       lttng/notification/channel-internal.h
index 1c8f100e91a700b1768d598fc8e62a2edd3428e2..652b4369cceeb8847fc66ccb19fc88b2c409a7f5 100644 (file)
@@ -21,6 +21,7 @@
 #include <lttng/condition/condition.h>
 #include <common/macros.h>
 #include <stdbool.h>
+#include <urcu/list.h>
 
 typedef void (*condition_destroy_cb)(struct lttng_condition *condition);
 typedef bool (*condition_validate_cb)(struct lttng_condition *condition);
@@ -32,6 +33,7 @@ struct lttng_condition {
        condition_validate_cb validate;
        condition_serialize_cb serialize;
        condition_destroy_cb destroy;
+       struct cds_list_head list_node;
 };
 
 struct lttng_condition_comm {
@@ -39,6 +41,10 @@ struct lttng_condition_comm {
        int8_t condition_type;
 };
 
+LTTNG_HIDDEN
+void lttng_condition_init(struct lttng_condition *condition,
+               enum lttng_condition_type type);
+
 LTTNG_HIDDEN
 bool lttng_condition_validate(struct lttng_condition *condition);
 
index c17049f1caf725b4a95c692e71c60a3c389d79c0..e73411d5c844e162602b6174d0378ae902afdeb6 100644 (file)
 
 typedef void (*evaluation_destroy_cb)(struct lttng_evaluation *evaluation);
 
+struct lttng_evaluation_comm {
+       /* enum lttng_condition_type type */
+       int8_t type;
+} LTTNG_PACKED;
+
 struct lttng_evaluation {
        enum lttng_condition_type type;
        evaluation_destroy_cb destroy;
index 69a82a6431ebc6da7e43a57a52ce8ac13532b70f..8fdf1a35d87968e2bfd8bc9f265d727f7826e933 100644 (file)
@@ -146,6 +146,7 @@ enum lttng_error_code {
        LTTNG_ERR_REGEN_STATEDUMP_NOMEM  = 123, /* Failed to regenerate the state dump, not enough memory */
        LTTNG_ERR_NOT_SNAPSHOT_SESSION   = 124, /* Session is not in snapshot mode. */
        LTTNG_ERR_INVALID_TRIGGER        = 125, /* Invalid trigger provided. */
+       LTTNG_ERR_COMMAND_CANCELLED      = 126, /* Command cancelled. */
 
        /* MUST be last element */
        LTTNG_ERR_NR,                           /* Last element */
index 77a9e6e92cd51ce7584e5c1f68c27b2e67d3cf29..b1c2d6af50e790e25dfd0af52f4a9253d84599ca 100644 (file)
@@ -23,21 +23,22 @@ extern "C" {
 #endif
 
 struct lttng_endpoint;
+struct lttng_condition;
 struct lttng_notification;
 struct lttng_notification_channel;
-struct lttng_notification_channel_endpoint;
 
 /* LTTng Notification channel */
 enum lttng_notification_channel_status {
        LTTNG_NOTIFICATION_CHANNEL_STATUS_TIMEOUT = 2,
        LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED = 1,
        LTTNG_NOTIFICATION_CHANNEL_STATUS_OK = 0,
-       LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED = -1,
-       LTTNG_NOTIFICATION_CHANNEL_STATUS_NOT_FOUND = -2,
-       LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED = -3,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR = -1,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED = -2,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_NOT_FOUND = -3,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED = -4,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID = -5,
 };
 
-/* TODO Add target parameter (sessiond, relayd, etc.). */
 extern struct lttng_notification_channel *lttng_notification_channel_create(
                struct lttng_endpoint *endpoint);
 
index eab0de7ab26256426eb901e7b1f7ab401becb993..4d90fe84fcdfb6322b34342cf768d70bdf32a332 100644 (file)
 #include <common/macros.h>
 #include <stdint.h>
 #include <stdbool.h>
+#include <urcu/list.h>
+#include <urcu/wfcqueue.h>
 
 struct lttng_trigger {
        struct lttng_condition *condition;
        struct lttng_action *action;
+       struct cds_list_head list_node;
 };
 
 struct lttng_trigger_comm {
index 3f2de9e7e021cb7fbd57276db4c71420e7b62a3b..a3c680affd21e7361c209c76921b39735a956200 100644 (file)
@@ -42,6 +42,7 @@
 #include "syscall.h"
 #include "agent.h"
 #include "buffer-registry.h"
+#include "notification-thread.h"
 
 #include "cmd.h"
 
@@ -3567,13 +3568,15 @@ end:
        return ret;
 }
 
-int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock)
+int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock,
+               struct notification_thread_data *notification_thread)
 {
        int ret;
        size_t trigger_len;
        ssize_t sock_recv_len;
        char *trigger_buffer = NULL;
        struct lttng_trigger *trigger = NULL;
+       uint64_t notification_counter = 1;
 
        trigger_len = (size_t) cmd_ctx->lsm->u.trigger.header.len;
        trigger_buffer = zmalloc(trigger_len);
@@ -3595,11 +3598,22 @@ int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock)
                ret = LTTNG_ERR_INVALID_TRIGGER;
                goto end;
        }
+
+       /* Enqueue and signal. */
+       pthread_mutex_lock(&notification_thread->cmd_queue.lock);
+       cds_list_add(&trigger->list_node, &notification_thread->cmd_queue.list);
+       trigger = NULL;
+       pthread_mutex_unlock(&notification_thread->cmd_queue.lock);
+       ret = write(notification_thread->cmd_queue.event_fd,
+                       &notification_counter, sizeof(notification_counter));
+       if (ret < 0) {
+               PERROR("write to notification thread's queue event fd");
+       } else {
+               ret = LTTNG_OK;
+       }
        DBG("Command register trigger succeeded");
-       ret = LTTNG_OK;
 end:
        free(trigger_buffer);
-       lttng_trigger_destroy(trigger);
        return ret;
 }
 
index 3be9adba7cdb2c9b5efa09ef64598a78483898ef..567ec078cb233109131bb727cf0b8baff917350b 100644 (file)
@@ -21,6 +21,8 @@
 #include "context.h"
 #include "session.h"
 
+struct notification_thread_data;
+
 /*
  * Init the command subsystem. Must be called before using any of the functions
  * above. This is called in the main() of the session daemon.
@@ -111,6 +113,7 @@ int cmd_set_session_shm_path(struct ltt_session *session,
 int cmd_regenerate_metadata(struct ltt_session *session);
 int cmd_regenerate_statedump(struct ltt_session *session);
 
-int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock);
+int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock,
+               struct notification_thread_data *notification_thread);
 
 #endif /* CMD_H */
index db3f8851b7a6dfc2aa270860be8b7acea12d773c..935a18c9943388c12eee6d5938e393cf0a5bb416 100644 (file)
@@ -4130,7 +4130,8 @@ error_add_context:
        }
        case LTTNG_REGISTER_TRIGGER:
        {
-               ret = cmd_register_trigger(cmd_ctx, sock);
+               ret = cmd_register_trigger(cmd_ctx, sock,
+                               notification_thread_data);
                break;
        }
        default:
index 5ec27b614b31f17b9a6175a7be04bb2a0f7335f1..9e112fdb2d89fb09647e7ad25649bc9eaefaf4d1 100644 (file)
 
 #define _LGPL_SOURCE
 #include <lttng/trigger/trigger.h>
+#include <lttng/notification/channel-internal.h>
+#include <lttng/notification/notification-internal.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/buffer-usage-internal.h>
 #include <common/error.h>
 #include <common/config/session-config.h>
 #include <common/defaults.h>
 #include <common/utils.h>
+#include <common/align.h>
+#include <common/time.h>
 #include <sys/eventfd.h>
 #include <sys/stat.h>
+#include <time.h>
+#include <signal.h>
 
 #include "notification-thread.h"
 #include "lttng-sessiond.h"
 #include "health-sessiond.h"
 
+#include <urcu/list.h>
+
+#define CLIENT_RECEPTION_BUFFER_SIZE   (4 * PAGE_SIZE)
+#define SIMULATION_TIMER_INTERVAL_NS   2 * NSEC_PER_SEC
+#define SIMULATION_TIMER_SIGNAL                SIGRTMIN + 10
+
+static int simulation_timer_event_fd = -1;
+static timer_t simulation_timer;
+
+struct client {
+       int socket;
+       struct cds_list_head list_node;
+       /*
+        * Conditions to which the client is registered.
+        */
+       struct cds_list_head condition_list;
+};
+
+static struct cds_list_head client_list;
+static struct cds_list_head trigger_list;
+static char *client_reception_buffer;
+
+/*
+ * The simulation timer will alternate between "buffers" between full and
+ * empty values, firing all low/high usage triggers in alternance.
+ */
+static pthread_mutex_t simulation_lock = PTHREAD_MUTEX_INITIALIZER;
+static uint64_t simulation_buffer_use_bytes;
+static double simulation_buffer_use_ratio = 0.0;
+static uint64_t simulation_buffer_capacity = UINT32_MAX;
+
 /*
  * Destroy the thread data previously created by the init function.
  */
@@ -47,6 +86,7 @@ void notification_destroy_data(struct notification_thread_data *data)
                PERROR("close notification command queue event_fd");
        }
 
+       pthread_mutex_destroy(&data->cmd_queue.lock);
        /* TODO: purge queue and mark commands as cancelled. */
 end:
        free(data);
@@ -58,6 +98,7 @@ end:
  */
 struct notification_thread_data *notification_init_data(void)
 {
+       int ret;
        struct notification_thread_data *data;
 
        data = zmalloc(sizeof(*data));
@@ -70,7 +111,11 @@ struct notification_thread_data *notification_init_data(void)
                PERROR("eventfd notification command queue");
                goto error;
        }
-       cds_wfcq_init(&data->cmd_queue.head, &data->cmd_queue.tail);
+       CDS_INIT_LIST_HEAD(&data->cmd_queue.list);
+       ret = pthread_mutex_init(&data->cmd_queue.lock, NULL);
+       if (ret) {
+               goto error;
+       }
 end:
        return data;
 error:
@@ -78,6 +123,84 @@ error:
        return NULL;
 }
 
+static
+void simulation_timer_thread(union sigval val)
+{
+       int ret;
+       uint64_t counter = 1;
+
+       pthread_mutex_lock(&simulation_lock);
+       if (simulation_buffer_use_bytes == 0) {
+               simulation_buffer_use_bytes = UINT32_MAX;
+               simulation_buffer_use_ratio = 1.0;
+       } else {
+               simulation_buffer_use_bytes = 0;
+               simulation_buffer_use_ratio = 0.0;
+       }
+       pthread_mutex_unlock(&simulation_lock);
+       ret = write(simulation_timer_event_fd, &counter, sizeof(counter));
+       if (ret < 0) {
+               PERROR("writer simulation timer event fd");
+       }
+}
+
+static
+int simulation_timer_start(void)
+{
+       int ret;
+       struct sigevent sev;
+       struct itimerspec its;
+
+       ret = eventfd(0, EFD_CLOEXEC);
+       if (ret < 0) {
+               PERROR("eventfd simulation timer event fd");
+               goto error;
+       }
+       simulation_timer_event_fd = ret;
+
+       sev.sigev_notify = SIGEV_THREAD;
+       sev.sigev_value.sival_ptr = NULL;
+       sev.sigev_notify_function = simulation_timer_thread;
+       sev.sigev_notify_attributes = NULL;
+
+       /*
+        * Valgrind indicates a leak when timer_create() is used
+        * in the "SIGEV_THREAD" mode. This bug has been known to upstream glibc
+        * since 2009, but no fix has been implemented so far.
+        */
+       ret = timer_create(CLOCK_MONOTONIC, &sev, &simulation_timer);
+       if (ret < 0) {
+               PERROR("timer_create simulation timer");
+               goto error;
+       }
+
+       its.it_value.tv_sec = SIMULATION_TIMER_INTERVAL_NS / NSEC_PER_SEC;
+       its.it_value.tv_nsec = (SIMULATION_TIMER_INTERVAL_NS % NSEC_PER_SEC);
+       its.it_interval.tv_sec = its.it_value.tv_sec;
+       its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+       ret = timer_settime(simulation_timer, 0, &its, NULL);
+       if (ret < 0) {
+               PERROR("timer_settime simulation timer");
+               goto error;
+       }
+
+       return 0;
+error:
+       return -1;
+}
+
+static
+void simulation_timer_stop(void)
+{
+       int ret;
+
+       ret = timer_delete(simulation_timer);
+       if (ret == -1) {
+               PERROR("timer_delete simulation timer");
+       }
+}
+
 static
 char *get_notification_channel_sock_path(void)
 {
@@ -103,7 +226,6 @@ char *get_notification_channel_sock_path(void)
                        ERR("Can't get HOME directory for socket creation");
                        goto error;
                }
-               puts(home_path);
 
                ret = snprintf(sock_path, LTTNG_PATH_MAX,
                                DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
@@ -176,6 +298,332 @@ error:
        return ret;
 }
 
+static
+int handle_new_connection(int socket)
+{
+       int ret;
+       struct client *client;
+
+       DBG("[notification-thread] Handling new notification channel client connection");
+
+       client = zmalloc(sizeof(*client));
+       if (!client) {
+               goto error;
+       }
+
+       ret = lttcomm_accept_unix_sock(socket);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to accept new notification channel client connection");
+               goto error;
+       }
+
+       client->socket = ret;
+       CDS_INIT_LIST_HEAD(&client->condition_list);
+
+       /* FIXME handle creds. */
+       ret = lttcomm_setsockopt_creds_unix_sock(socket);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
+               goto error;
+       }
+
+       cds_list_add(&client->list_node, &client_list);
+       return client->socket;
+error:
+       free(client);
+       return -1;
+}
+
+static
+int send_command_reply(int socket,
+               enum lttng_notification_channel_status status)
+{
+       ssize_t ret;
+       struct lttng_notification_channel_command_reply reply = {
+               .status = (int8_t) status,
+       };
+
+       DBG("[notification-thread] Send command reply (%i)", (int) status);
+
+       ret = lttcomm_send_unix_sock(socket, &reply, sizeof(reply));
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to send command reply");
+               goto error;
+       }
+       return 0;
+error:
+       return -1;
+}
+
+static
+struct client *get_client_from_fd(int fd)
+{
+       struct client *client;
+
+       cds_list_for_each_entry(client, &client_list, list_node) {
+               if (client->socket == fd) {
+                       return client;
+               }
+       }
+       return NULL;
+}
+
+static
+int handle_notification_channel_client(int socket)
+{
+       ssize_t ret;
+       size_t received = 0;
+       struct client *client = get_client_from_fd(socket);
+       struct lttng_condition *condition;
+       struct lttng_notification_channel_command command;
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+       struct lttng_trigger *trigger;
+
+       assert(client);
+
+       /* Receive command header. */
+       do
+       {
+               ret = lttcomm_recv_unix_sock(socket, ((char *) &command) + received,
+                               sizeof(command) - received);
+               if (ret <= 0) {
+                       ERR("[notification-thread] Failed to receive channel command from client (received %zu bytes)", received);
+                       goto error_no_reply;
+               }
+               received += ret;
+       } while (received < sizeof(command));
+
+       received = 0;
+       if (command.size >= CLIENT_RECEPTION_BUFFER_SIZE) {
+               ERR("[notification-thread] Notification channel client attempted to send condition larger (%u bytes) than client reception buffer (%u bytes)",
+                               command.size,
+                               (unsigned int) CLIENT_RECEPTION_BUFFER_SIZE);
+               goto error_no_reply;
+       }
+
+       do
+       {
+               ret = lttcomm_recv_unix_sock(socket,
+                               client_reception_buffer + received,
+                               command.size - received);
+               if (ret <= 0) {
+                       ERR("[notification-thread] Failed to receive condition from client");
+                       goto error_no_reply;
+               }
+               received += ret;
+       } while (received < sizeof(command));
+
+       ret = lttng_condition_create_from_buffer(client_reception_buffer,
+                       &condition);
+       if (ret < 0 || ret < command.size) {
+               ERR("[notification-thread] Malformed condition received from client");
+               goto error_no_reply;
+       }
+
+       DBG("[notification-thread] Successfully received condition from notification channel client");
+
+       /*
+        * A client may only listen for a condition that is currently associated
+        * with a trigger known to the system.
+        */
+       DBG("[notification-thread] Comparing registered condition to known trigger conditions");
+       cds_list_for_each_entry(trigger, &trigger_list, list_node) {
+               struct lttng_condition *trigger_condition =
+                               lttng_trigger_get_condition(trigger);
+
+               if (!trigger_condition) {
+                       ERR("[notification-thread] lttng_trigger_get_condition returned NULL");
+                       status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+                       goto end;
+               }
+
+               if (lttng_condition_is_equal(trigger_condition, condition)) {
+                       /* Matching condition found. */
+                       DBG("[notification-thread] Found a matching condition, accepting client subscription request");
+                       cds_list_add(&condition->list_node,
+                                       &client->condition_list);
+                       goto end;
+               }
+       }
+
+       /* No match found, refuse the subscription. */
+       DBG("[notification-thread] No matching condition found, refusing client subscription request");
+       status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN;
+end:
+       if (send_command_reply(socket, status)) {
+               goto error_no_reply;
+       }
+       return 0;
+error_no_reply:
+       return -1;
+}
+
+static
+void client_destroy(struct client *client)
+{
+       struct lttng_condition *condition, *tmp;
+
+       cds_list_for_each_entry_safe(condition, tmp, &client->condition_list,
+                       list_node) {
+               cds_list_del(&condition->list_node);
+               lttng_condition_destroy(condition);
+       }
+
+       (void) lttcomm_close_unix_sock(client->socket);
+       free(client);
+}
+
+static
+void clean_up_notification_channel_client(int socket)
+{
+       struct client *client;
+
+       DBG("[notification-thread] Searching for client data for clean-up");
+       cds_list_for_each_entry(client, &client_list, list_node) {
+               if (client->socket == socket) {
+                       DBG("[notification-thread] Client data found for clean-up");
+                       cds_list_del(&client->list_node);
+                       client_destroy(client);
+                       return;
+               }
+       }
+       ERR("[notification-thread] Failed to clean-up client data");
+}
+
+static
+void activate_triggers(struct cds_list_head *new_triggers_list)
+{
+       struct lttng_trigger *trigger, *tmp;
+
+       DBG("[notification-thread] Moving triggers from new list to activated trigger set");
+       cds_list_for_each_entry_safe(trigger, tmp, new_triggers_list, list_node) {
+               cds_list_del(&trigger->list_node);
+               cds_list_add(&trigger->list_node, &trigger_list);
+       }
+}
+
+static
+void clean_up_triggers(void)
+{
+       struct lttng_trigger *trigger, *tmp;
+
+       DBG("[notification-thread] Cleaning up triggers");
+       cds_list_for_each_entry_safe(trigger, tmp, &trigger_list, list_node) {
+               DBG("[notification-thread] Destroying trigger");
+               cds_list_del(&trigger->list_node);
+               lttng_trigger_destroy(trigger);
+       }
+}
+
+static
+struct lttng_evaluation *evaluate_buffer_usage_condition(
+               struct lttng_condition *_condition)
+{
+       uint64_t threshold;
+       struct lttng_evaluation *evaluation = NULL;
+       struct lttng_condition_buffer_usage *condition = container_of(
+                       _condition, struct lttng_condition_buffer_usage,
+                       parent);
+
+       if (condition->threshold_bytes.set) {
+               threshold = condition->threshold_bytes.value;
+       } else {
+               /* Threshold was expressed as a ratio. */
+               threshold = (uint64_t) (condition->threshold_ratio.value *
+                               (double) simulation_buffer_capacity);
+       }
+
+       if (condition->parent.type ==
+                       LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
+               if (simulation_buffer_use_bytes <= threshold) {
+                       evaluation = lttng_evaluation_buffer_usage_create(
+                                       condition->parent.type,
+                                       simulation_buffer_use_bytes,
+                                       simulation_buffer_capacity);
+               }
+       } else {
+               if (simulation_buffer_use_bytes >= threshold) {
+                       evaluation = lttng_evaluation_buffer_usage_create(
+                                       condition->parent.type,
+                                       simulation_buffer_use_bytes,
+                                       simulation_buffer_capacity);
+               }
+       }
+       return evaluation;
+}
+
+static
+void notify_client(struct client *client, struct lttng_condition *condition,
+               struct lttng_evaluation *evaluation)
+{
+       ssize_t notification_size, ret;
+       char *notification_buffer;
+       struct lttng_notification *notification;
+
+       notification = lttng_notification_create(condition, evaluation);
+       if (!notification) {
+               ERR("[notification-thread] Failed to create client notification");
+               return;
+       }
+
+       notification_size = lttng_notification_serialize(notification, NULL);
+       if (notification_size < 0) {
+               ERR("[notification-thread] Failed to get size of serialized notification");
+               return;
+       }
+
+       notification_buffer = zmalloc(notification_size);
+       if (!notification_buffer) {
+               ERR("[notification-thread] Failed to allocate notification serialization buffer");
+       }
+
+       ret = lttng_notification_serialize(notification, notification_buffer);
+       if (ret != notification_size) {
+               ERR("[notification-thread] Failed to serialize notification");
+               return;
+       }
+
+       ret = lttcomm_send_unix_sock(client->socket, notification_buffer,
+                       notification_size);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to send notification to client");
+               return;
+       }
+}
+
+static
+void evaluate_client_conditions(void)
+{
+       struct client *client;
+
+       DBG("[notification-thread] Evaluating client conditions");
+       cds_list_for_each_entry(client, &client_list, list_node) {
+               struct lttng_condition *condition;
+               cds_list_for_each_entry(condition, &client->condition_list,
+                               list_node) {
+                       struct lttng_evaluation *evaluation = NULL;
+                       switch (lttng_condition_get_type(condition)) {
+                       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+                       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+                               evaluation = evaluate_buffer_usage_condition(
+                                               condition);
+                               break;
+                       default:
+                               ERR("[notification-thread] Unknown condition type encountered in evaluation");
+                               abort();
+                       }
+
+                       if (evaluation) {
+                               DBG("[notification-thread] Condition evaluated to true");
+                               notify_client(client, condition, evaluation);
+                               lttng_evaluation_destroy(evaluation);
+                       }
+               }
+       }
+       DBG("[notification-thread] Client conditions evaluated");
+}
+
 /*
  * This thread services notification channel clients and received notifications
  * from various lttng-sessiond components over a command queue.
@@ -189,6 +637,11 @@ void *thread_notification(void *data)
 
        DBG("[notification-thread] Started notification thread");
 
+       CDS_INIT_LIST_HEAD(&client_list);
+       CDS_INIT_LIST_HEAD(&trigger_list);
+
+       simulation_timer_start();
+
        if (!ctx) {
                ERR("[notification-thread] Invalid thread context provided");
                goto end;
@@ -200,7 +653,17 @@ void *thread_notification(void *data)
        health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
        health_code_update();
 
-       notification_channel_socket = notification_channel_socket_create();
+       client_reception_buffer = zmalloc(CLIENT_RECEPTION_BUFFER_SIZE);
+       if (!client_reception_buffer) {
+               ERR("[notification-thread] Failed to allocate client reception buffer");
+               goto end;
+       }
+
+       ret = notification_channel_socket_create();
+       if (ret < 0) {
+               goto end;
+       }
+       notification_channel_socket = ret;
 
        /*
         * Create pollset with size 2, quit pipe and notification channel
@@ -226,6 +689,13 @@ void *thread_notification(void *data)
                goto error;
        }
 
+       ret = lttng_poll_add(&events, simulation_timer_event_fd,
+                       LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to add timer event fd to pollset");
+               goto error;
+       }
+
        DBG("[notification-thread] Listening on notification channel socket");
        ret = lttcomm_listen_unix_sock(notification_channel_socket);
        if (ret < 0) {
@@ -257,11 +727,95 @@ void *thread_notification(void *data)
                        int fd = LTTNG_POLL_GETFD(&events, i);
                        uint32_t revents = LTTNG_POLL_GETEV(&events, i);
 
+                       DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        if (sessiond_check_thread_quit_pipe(fd, revents)) {
                                DBG("[notification-thread] Quit pipe signaled, exiting.");
                                goto exit;
                        }
+
+                       if (fd == notification_channel_socket) {
+                               if (revents & LPOLLIN) {
+                                       int new_socket;
+
+                                       ret = handle_new_connection(
+                                                       notification_channel_socket);
+                                       if (ret < 0) {
+                                               continue;
+                                       }
+                                       new_socket = ret;
+
+                                       ret = lttng_poll_add(&events, new_socket,
+                                                       LPOLLIN | LPOLLERR |
+                                                       LPOLLHUP | LPOLLRDHUP);
+                                       if (ret < 0) {
+                                               ERR("[notification-thread] Failed to add notification channel client socket to pollset");
+                                               goto error;
+                                       }
+                                       DBG("[notification-thread] Added new notification channel client socket to poll set");
+                               } else if (revents &
+                                               (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("[notification-thread] Notification socket poll error");
+                                       goto error;
+                               } else {
+                                       ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
+                                       goto error;
+                               }
+                       } else if (fd == ctx->cmd_queue.event_fd) {
+                               /*
+                                * Handling of internaly-generated events to
+                                * evaluate against the set of active
+                                * conditions.
+                                */
+                               uint64_t counter;
+
+                               DBG("[notification-thread] Event received on command queue event fd");
+                               ret = read(fd, &counter, sizeof(counter));
+                               if (ret < 0) {
+                                       ERR("read on command queue event fd");
+                               }
+
+                               pthread_mutex_lock(&ctx->cmd_queue.lock);
+                               activate_triggers(&ctx->cmd_queue.list);
+                               pthread_mutex_unlock(&ctx->cmd_queue.lock);
+                       } else if (fd == simulation_timer_event_fd) {
+                               /*
+                                * Place-holder timer to simulate activity in
+                                * the system.
+                                */
+                               uint64_t counter;
+
+                               DBG("[notification-thread] Simulation timer fired");
+                               ret = read(fd, &counter, sizeof(counter));
+                               if (ret < 0) {
+                                       ERR("read on simulation timer event fd");
+                               }
+
+                               pthread_mutex_lock(&simulation_lock);
+                               evaluate_client_conditions();
+                               pthread_mutex_unlock(&simulation_lock);
+                       } else {
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       /*
+                                        * It doesn't matter if a command was
+                                        * pending on the client socket at this
+                                        * point since it now has now way to
+                                        * receive the notifications to which
+                                        * it was subscribing or unsubscribing.
+                                        */
+                                       DBG("[notification-thread] Closing client connection (fd = %i)", fd);
+                                       clean_up_notification_channel_client(fd);
+                               } else if (revents & LPOLLIN) {
+                                       ret = handle_notification_channel_client(fd);
+                                       if (ret) {
+                                               DBG("[notification-thread] Closing client connection following error");
+                                               clean_up_notification_channel_client(fd);
+                                       }
+                               } else {
+                                       DBG("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
+                               }
+                       }
                }
        }
 exit:
@@ -273,6 +827,9 @@ error_poll_create:
        health_unregister(health_sessiond);
        rcu_thread_offline();
        rcu_unregister_thread();
+       free(client_reception_buffer);
+       clean_up_triggers();
 end:
+       simulation_timer_stop();
        return NULL;
 }
index 7f797e28e870e74f4c0a0d7c2a25ee853bbfc50c..f5dcd3b54fa11d35d940f2bef608fcef9508a6b2 100644 (file)
@@ -20,6 +20,7 @@
 
 #include <urcu/wfcqueue.h>
 #include <lttng/trigger/trigger.h>
+#include <pthread.h>
 
 enum notification_command_type {
        NOTIFICATION_COMMAND_TYPE_NEW_TRIGGER,
@@ -47,8 +48,8 @@ struct notification_thread_data {
         */
        struct notification_cmd_queue {
                int event_fd;
-               struct cds_wfcq_head head;
-               struct cds_wfcq_tail tail;
+               struct cds_list_head list;
+               pthread_mutex_t lock;
        } cmd_queue;
 };
 
@@ -59,7 +60,7 @@ struct notification_command *notification_new_trigger_command_destroy(
 
 void *thread_notification(void *data);
 
-int notification_thread_init_data(struct notification_thread_data **data);
-void notification_thread_destroy_data(struct notification_thread_data *data);
+struct notification_thread_data *notification_init_data(void);
+void notification_destroy_data(struct notification_thread_data *data);
 
 #endif /* NOTIFICATION_THREAD_H */
index 697967d8c3b92471a3b6d64577e82cc3b6e91481..66449c3139113db69676f5702ef150a9a0c35b0f 100644 (file)
@@ -140,7 +140,7 @@ struct lttng_condition *lttng_condition_buffer_usage_create(
                goto end;
        }
 
-       condition->parent.type = type;
+       lttng_condition_init(&condition->parent, type);
        condition->parent.validate = lttng_condition_buffer_usage_validate;
        condition->parent.serialize = lttng_condition_buffer_usage_serialize;
        condition->parent.destroy = lttng_condition_buffer_usage_destroy;
index 5c3c11c237570d3039feb1a01ca3464cb1ef66f1..4a24202fed6f851e309344471e3664d599045f42 100644 (file)
@@ -131,3 +131,11 @@ ssize_t lttng_condition_create_from_buffer(const char *buf,
 end:
        return ret;
 }
+
+LTTNG_HIDDEN
+void lttng_condition_init(struct lttng_condition *condition,
+               enum lttng_condition_type type)
+{
+       condition->type = type;
+       CDS_INIT_LIST_HEAD(&condition->list_node);
+}
index 018fd0901d948282780aa526dcdd422a55e4a8e2..f7b2e1e37ee6a6d7ec942738a15579cb9e3a11a9 100644 (file)
@@ -187,6 +187,7 @@ static const char *error_string_array[] = {
        [ ERROR_INDEX(LTTNG_ERR_REGEN_STATEDUMP_NOMEM) ] = "Failed to regenerate the state dump, not enough memory",
        [ ERROR_INDEX(LTTNG_ERR_NOT_SNAPSHOT_SESSION) ] = "Snapshot command can't be applied to a non-snapshot session",
        [ ERROR_INDEX(LTTNG_ERR_INVALID_TRIGGER) ] = "Invalid trigger",
+       [ ERROR_INDEX(LTTNG_ERR_COMMAND_CANCELLED) ] = "Command cancelled",
 
        /* Last element */
        [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code"
index 12a7329f4492396db302e7db36868d1f86c89ab9..ede1ac567dad043627d6ea71b070eab8c12969ec 100644 (file)
@@ -54,6 +54,7 @@ struct lttng_trigger *lttng_trigger_create(
 
        trigger->condition = condition;
        trigger->action = action;
+       CDS_INIT_LIST_HEAD(&trigger->list_node);
 end:
        return trigger;
 }
index 6b6a6eed127525f17a2a94454ea18e8fbf3dacc1..b5156caf94aa9320a2712a7525a5138bff1b4b74 100644 (file)
@@ -5,7 +5,8 @@ SUBDIRS = filter
 lib_LTLIBRARIES = liblttng-ctl.la
 
 liblttng_ctl_la_SOURCES = lttng-ctl.c snapshot.c lttng-ctl-helper.h \
-               lttng-ctl-health.c save.c load.c deprecated-symbols.c
+               lttng-ctl-health.c save.c load.c deprecated-symbols.c \
+               channel.c
 
 liblttng_ctl_la_LDFLAGS = \
                $(LT_NO_UNDEFINED)
This page took 0.041207 seconds and 5 git commands to generate.