+static
+int handle_new_connection(int socket)
+{
+ int ret;
+ struct client *client;
+
+ DBG("[notification-thread] Handling new notification channel client connection");
+
+ client = zmalloc(sizeof(*client));
+ if (!client) {
+ goto error;
+ }
+
+ ret = lttcomm_accept_unix_sock(socket);
+ if (ret < 0) {
+ ERR("[notification-thread] Failed to accept new notification channel client connection");
+ goto error;
+ }
+
+ client->socket = ret;
+ CDS_INIT_LIST_HEAD(&client->condition_list);
+
+ /* FIXME handle creds. */
+ ret = lttcomm_setsockopt_creds_unix_sock(socket);
+ if (ret < 0) {
+ ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
+ goto error;
+ }
+
+ cds_list_add(&client->list_node, &client_list);
+ return client->socket;
+error:
+ free(client);
+ return -1;
+}
+
+static
+int send_command_reply(int socket,
+ enum lttng_notification_channel_status status)
+{
+ ssize_t ret;
+ struct lttng_notification_channel_command_reply reply = {
+ .status = (int8_t) status,
+ };
+
+ DBG("[notification-thread] Send command reply (%i)", (int) status);
+
+ ret = lttcomm_send_unix_sock(socket, &reply, sizeof(reply));
+ if (ret < 0) {
+ ERR("[notification-thread] Failed to send command reply");
+ goto error;
+ }
+ return 0;
+error:
+ return -1;
+}
+
+static
+struct client *get_client_from_fd(int fd)
+{
+ struct client *client;
+
+ cds_list_for_each_entry(client, &client_list, list_node) {
+ if (client->socket == fd) {
+ return client;
+ }
+ }
+ return NULL;
+}
+
+static
+int handle_notification_channel_client(int socket)
+{
+ ssize_t ret;
+ size_t received = 0;
+ struct client *client = get_client_from_fd(socket);
+ struct lttng_condition *condition;
+ struct lttng_notification_channel_command command;
+ enum lttng_notification_channel_status status =
+ LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+ struct lttng_trigger *trigger;
+
+ assert(client);
+
+ /* Receive command header. */
+ do
+ {
+ ret = lttcomm_recv_unix_sock(socket, ((char *) &command) + received,
+ sizeof(command) - received);
+ if (ret <= 0) {
+ ERR("[notification-thread] Failed to receive channel command from client (received %zu bytes)", received);
+ goto error_no_reply;
+ }
+ received += ret;
+ } while (received < sizeof(command));
+
+ received = 0;
+ if (command.size >= CLIENT_RECEPTION_BUFFER_SIZE) {
+ ERR("[notification-thread] Notification channel client attempted to send condition larger (%u bytes) than client reception buffer (%u bytes)",
+ command.size,
+ (unsigned int) CLIENT_RECEPTION_BUFFER_SIZE);
+ goto error_no_reply;
+ }
+
+ do
+ {
+ ret = lttcomm_recv_unix_sock(socket,
+ client_reception_buffer + received,
+ command.size - received);
+ if (ret <= 0) {
+ ERR("[notification-thread] Failed to receive condition from client");
+ goto error_no_reply;
+ }
+ received += ret;
+ } while (received < sizeof(command));
+
+ ret = lttng_condition_create_from_buffer(client_reception_buffer,
+ &condition);
+ if (ret < 0 || ret < command.size) {
+ ERR("[notification-thread] Malformed condition received from client");
+ goto error_no_reply;
+ }
+
+ DBG("[notification-thread] Successfully received condition from notification channel client");
+
+ /*
+ * A client may only listen for a condition that is currently associated
+ * with a trigger known to the system.
+ */
+ DBG("[notification-thread] Comparing registered condition to known trigger conditions");
+ cds_list_for_each_entry(trigger, &trigger_list, list_node) {
+ struct lttng_condition *trigger_condition =
+ lttng_trigger_get_condition(trigger);
+
+ if (!trigger_condition) {
+ ERR("[notification-thread] lttng_trigger_get_condition returned NULL");
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end;
+ }
+
+ if (lttng_condition_is_equal(trigger_condition, condition)) {
+ /* Matching condition found. */
+ DBG("[notification-thread] Found a matching condition, accepting client subscription request");
+ cds_list_add(&condition->list_node,
+ &client->condition_list);
+ goto end;
+ }
+ }
+
+ /* No match found, refuse the subscription. */
+ DBG("[notification-thread] No matching condition found, refusing client subscription request");
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN;
+end:
+ if (send_command_reply(socket, status)) {
+ goto error_no_reply;
+ }
+ return 0;
+error_no_reply:
+ return -1;
+}
+
+static
+void client_destroy(struct client *client)
+{
+ struct lttng_condition *condition, *tmp;
+
+ cds_list_for_each_entry_safe(condition, tmp, &client->condition_list,
+ list_node) {
+ cds_list_del(&condition->list_node);
+ lttng_condition_destroy(condition);
+ }
+
+ (void) lttcomm_close_unix_sock(client->socket);
+ free(client);
+}
+
+static
+void clean_up_notification_channel_client(int socket)
+{
+ struct client *client;
+
+ DBG("[notification-thread] Searching for client data for clean-up");
+ cds_list_for_each_entry(client, &client_list, list_node) {
+ if (client->socket == socket) {
+ DBG("[notification-thread] Client data found for clean-up");
+ cds_list_del(&client->list_node);
+ client_destroy(client);
+ return;
+ }
+ }
+ ERR("[notification-thread] Failed to clean-up client data");
+}
+
+static
+void activate_triggers(struct cds_list_head *new_triggers_list)
+{
+ struct lttng_trigger *trigger, *tmp;
+
+ DBG("[notification-thread] Moving triggers from new list to activated trigger set");
+ cds_list_for_each_entry_safe(trigger, tmp, new_triggers_list, list_node) {
+ cds_list_del(&trigger->list_node);
+ cds_list_add(&trigger->list_node, &trigger_list);
+ }
+}
+
+static
+void clean_up_triggers(void)
+{
+ struct lttng_trigger *trigger, *tmp;
+
+ DBG("[notification-thread] Cleaning up triggers");
+ cds_list_for_each_entry_safe(trigger, tmp, &trigger_list, list_node) {
+ DBG("[notification-thread] Destroying trigger");
+ cds_list_del(&trigger->list_node);
+ lttng_trigger_destroy(trigger);
+ }
+}
+
+static
+struct lttng_evaluation *evaluate_buffer_usage_condition(
+ struct lttng_condition *_condition)
+{
+ uint64_t threshold;
+ struct lttng_evaluation *evaluation = NULL;
+ struct lttng_condition_buffer_usage *condition = container_of(
+ _condition, struct lttng_condition_buffer_usage,
+ parent);
+
+ if (condition->threshold_bytes.set) {
+ threshold = condition->threshold_bytes.value;
+ } else {
+ /* Threshold was expressed as a ratio. */
+ threshold = (uint64_t) (condition->threshold_ratio.value *
+ (double) simulation_buffer_capacity);
+ }
+
+ if (condition->parent.type ==
+ LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
+ if (simulation_buffer_use_bytes <= threshold) {
+ evaluation = lttng_evaluation_buffer_usage_create(
+ condition->parent.type,
+ simulation_buffer_use_bytes,
+ simulation_buffer_capacity);
+ }
+ } else {
+ if (simulation_buffer_use_bytes >= threshold) {
+ evaluation = lttng_evaluation_buffer_usage_create(
+ condition->parent.type,
+ simulation_buffer_use_bytes,
+ simulation_buffer_capacity);
+ }
+ }
+ return evaluation;
+}
+
+static
+void notify_client(struct client *client, struct lttng_condition *condition,
+ struct lttng_evaluation *evaluation)
+{
+ ssize_t notification_size, ret;
+ char *notification_buffer;
+ struct lttng_notification *notification;
+
+ notification = lttng_notification_create(condition, evaluation);
+ if (!notification) {
+ ERR("[notification-thread] Failed to create client notification");
+ return;
+ }
+
+ notification_size = lttng_notification_serialize(notification, NULL);
+ if (notification_size < 0) {
+ ERR("[notification-thread] Failed to get size of serialized notification");
+ return;
+ }
+
+ notification_buffer = zmalloc(notification_size);
+ if (!notification_buffer) {
+ ERR("[notification-thread] Failed to allocate notification serialization buffer");
+ }
+
+ ret = lttng_notification_serialize(notification, notification_buffer);
+ if (ret != notification_size) {
+ ERR("[notification-thread] Failed to serialize notification");
+ return;
+ }
+
+ ret = lttcomm_send_unix_sock(client->socket, notification_buffer,
+ notification_size);
+ if (ret < 0) {
+ ERR("[notification-thread] Failed to send notification to client");
+ return;
+ }
+}
+
+static
+void evaluate_client_conditions(void)
+{
+ struct client *client;
+
+ DBG("[notification-thread] Evaluating client conditions");
+ cds_list_for_each_entry(client, &client_list, list_node) {
+ struct lttng_condition *condition;
+ cds_list_for_each_entry(condition, &client->condition_list,
+ list_node) {
+ struct lttng_evaluation *evaluation = NULL;
+ switch (lttng_condition_get_type(condition)) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+ evaluation = evaluate_buffer_usage_condition(
+ condition);
+ break;
+ default:
+ ERR("[notification-thread] Unknown condition type encountered in evaluation");
+ abort();
+ }
+
+ if (evaluation) {
+ DBG("[notification-thread] Condition evaluated to true");
+ notify_client(client, condition, evaluation);
+ lttng_evaluation_destroy(evaluation);
+ }
+ }
+ }
+ DBG("[notification-thread] Client conditions evaluated");
+}
+