Deliverables 3 and 4 notification-items-3-4
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 24 Feb 2017 06:13:17 +0000 (01:13 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 23 Mar 2017 05:42:36 +0000 (01:42 -0400)
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
72 files changed:
configure.ac
include/Makefile.am
include/lttng/action/action-internal.h [new file with mode: 0644]
include/lttng/action/action.h [new file with mode: 0644]
include/lttng/action/notify-internal.h [new file with mode: 0644]
include/lttng/action/notify.h [new file with mode: 0644]
include/lttng/condition/buffer-usage-internal.h [new file with mode: 0644]
include/lttng/condition/buffer-usage.h [new file with mode: 0644]
include/lttng/condition/condition-internal.h [new file with mode: 0644]
include/lttng/condition/condition.h [new file with mode: 0644]
include/lttng/condition/evaluation-internal.h [new file with mode: 0644]
include/lttng/condition/evaluation.h [new file with mode: 0644]
include/lttng/endpoint-internal.h [new file with mode: 0644]
include/lttng/endpoint.h [new file with mode: 0644]
include/lttng/lttng-error.h
include/lttng/notification/channel-internal.h [new file with mode: 0644]
include/lttng/notification/channel.h [new file with mode: 0644]
include/lttng/notification/notification-internal.h [new file with mode: 0644]
include/lttng/notification/notification.h [new file with mode: 0644]
include/lttng/trigger/trigger-internal.h [new file with mode: 0644]
include/lttng/trigger/trigger.h [new file with mode: 0644]
src/bin/lttng-consumerd/lttng-consumerd.c
src/bin/lttng-sessiond/Makefile.am
src/bin/lttng-sessiond/agent-thread.c
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/cmd.h
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/health-sessiond.h
src/bin/lttng-sessiond/lttng-sessiond.h
src/bin/lttng-sessiond/lttng-ust-ctl.h
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/notification-thread-commands.c [new file with mode: 0644]
src/bin/lttng-sessiond/notification-thread-commands.h [new file with mode: 0644]
src/bin/lttng-sessiond/notification-thread-events.c [new file with mode: 0644]
src/bin/lttng-sessiond/notification-thread-events.h [new file with mode: 0644]
src/bin/lttng-sessiond/notification-thread.c [new file with mode: 0644]
src/bin/lttng-sessiond/notification-thread.h [new file with mode: 0644]
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-consumer.c
src/bin/lttng-sessiond/ust-registry.c
src/bin/lttng-sessiond/ust-registry.h
src/common/Makefile.am
src/common/action.c [new file with mode: 0644]
src/common/buffer-usage.c [new file with mode: 0644]
src/common/condition.c [new file with mode: 0644]
src/common/consumer/consumer-timer.c
src/common/consumer/consumer-timer.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/defaults.h
src/common/dynamic-buffer.c [new file with mode: 0644]
src/common/dynamic-buffer.h [new file with mode: 0644]
src/common/endpoint.c [new file with mode: 0644]
src/common/error.c
src/common/evaluation.c [new file with mode: 0644]
src/common/macros.h
src/common/notification.c [new file with mode: 0644]
src/common/notify.c [new file with mode: 0644]
src/common/pipe.c
src/common/pipe.h
src/common/sessiond-comm/sessiond-comm.h
src/common/trigger.c [new file with mode: 0644]
src/common/unix.c
src/common/ust-consumer/ust-consumer.c
src/common/utils.c
src/common/utils.h
src/lib/lttng-ctl/Makefile.am
src/lib/lttng-ctl/channel.c [new file with mode: 0644]
src/lib/lttng-ctl/lttng-ctl.c
tests/unit/Makefile.am
tests/unit/test_ust_data.c

index 0a7034e24eaaffd9f1c679b4eac896da18193045..9bc39412382261212197f78e9a9ce4b0bcdcb924 100644 (file)
@@ -948,8 +948,20 @@ CFLAGS="-Wall $CFLAGS -g -fno-strict-aliasing"
 DEFAULT_INCLUDES="-I\$(top_srcdir) -I\$(top_builddir) -I\$(top_builddir)/src -I\$(top_builddir)/include -include config.h"
 
 lttngincludedir="${includedir}/lttng"
-
 AC_SUBST(lttngincludedir)
+
+lttngactionincludedir="${includedir}/lttng/action"
+AC_SUBST(lttngactionincludedir)
+
+lttngconditionincludedir="${includedir}/lttng/condition"
+AC_SUBST(lttngconditionincludedir)
+
+lttngnotificationincludedir="${includedir}/lttng/notification"
+AC_SUBST(lttngnotificationincludedir)
+
+lttngtriggerincludedir="${includedir}/lttng/trigger"
+AC_SUBST(lttngtriggerincludedir)
+
 AC_SUBST(DEFAULT_INCLUDES)
 
 lttnglibexecdir="${libdir}/lttng/libexec"
index 0536dcdf00e328a1f83433df310e7a37c888c622..bc105b0649c482d0959aa35325c8cdc8909f3733 100644 (file)
@@ -77,10 +77,36 @@ lttnginclude_HEADERS = \
        lttng/snapshot.h \
        lttng/save.h \
        lttng/load.h \
+       lttng/endpoint.h \
        version.h.tmpl
 
+lttngactioninclude_HEADERS= \
+       lttng/action/action.h \
+       lttng/action/notify.h
+
+lttngconditioninclude_HEADERS= \
+       lttng/condition/condition.h \
+       lttng/condition/buffer-usage.h \
+       lttng/condition/evaluation.h
+
+lttngnotificationinclude_HEADERS= \
+       lttng/notification/channel.h \
+       lttng/notification/notification.h
+
+lttngtriggerinclude_HEADERS= \
+       lttng/trigger/trigger.h
+
 noinst_HEADERS = \
        lttng/snapshot-internal.h \
        lttng/health-internal.h \
        lttng/save-internal.h \
-       lttng/load-internal.h
+       lttng/load-internal.h \
+       lttng/action/action-internal.h \
+       lttng/action/notify-internal.h \
+       lttng/condition/condition-internal.h \
+       lttng/condition/buffer-usage-internal.h \
+       lttng/condition/evaluation-internal.h \
+       lttng/notification/notification-internal.h \
+       lttng/trigger/trigger-internal.h \
+       lttng/endpoint-internal.h \
+       lttng/notification/channel-internal.h
diff --git a/include/lttng/action/action-internal.h b/include/lttng/action/action-internal.h
new file mode 100644 (file)
index 0000000..9210f9f
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_ACTION_INTERNAL_H
+#define LTTNG_ACTION_INTERNAL_H
+
+#include <lttng/action/action.h>
+#include <common/macros.h>
+#include <stdbool.h>
+
+typedef bool (*action_validate_cb)(struct lttng_action *action);
+typedef void (*action_destroy_cb)(struct lttng_action *action);
+typedef ssize_t (*action_serialize_cb)(struct lttng_action *action, char *buf);
+
+struct lttng_action {
+       enum lttng_action_type type;
+       action_validate_cb validate;
+       action_serialize_cb serialize;
+       action_destroy_cb destroy;
+};
+
+struct lttng_action_comm {
+       /* enum lttng_action_type */
+       int8_t action_type;
+} LTTNG_PACKED;
+
+LTTNG_HIDDEN
+bool lttng_action_validate(struct lttng_action *action);
+
+LTTNG_HIDDEN
+ssize_t lttng_action_serialize(struct lttng_action *action, char *buf);
+
+/*
+ * FIXME Add explicit buffer bound checking using a "len" parameter to
+ * ensure malformed buffers are caught and rejected.
+ */
+LTTNG_HIDDEN
+ssize_t lttng_action_create_from_buffer(const char *buf,
+               struct lttng_action **action);
+
+#endif /* LTTNG_ACTION_INTERNAL_H */
diff --git a/include/lttng/action/action.h b/include/lttng/action/action.h
new file mode 100644 (file)
index 0000000..311f5a0
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_ACTION_H
+#define LTTNG_ACTION_H
+
+struct lttng_action;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum lttng_action_type {
+       LTTNG_ACTION_TYPE_UNKNOWN = -1,
+       LTTNG_ACTION_TYPE_NOTIFY = 0,
+};
+
+extern enum lttng_action_type lttng_action_get_type(
+               struct lttng_action *action);
+
+extern void lttng_action_destroy(struct lttng_action *action);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_ACTION_H */
diff --git a/include/lttng/action/notify-internal.h b/include/lttng/action/notify-internal.h
new file mode 100644 (file)
index 0000000..509bd36
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_ACTION_NOTIFY_INTERNAL_H
+#define LTTNG_ACTION_NOTIFY_INTERNAL_H
+
+#include <lttng/action/notify.h>
+#include <lttng/action/action-internal.h>
+
+struct lttng_action_notify {
+       struct lttng_action parent;
+};
+
+#endif /* LTTNG_ACTION_NOTIFY_INTERNAL_H */
diff --git a/include/lttng/action/notify.h b/include/lttng/action/notify.h
new file mode 100644 (file)
index 0000000..6c0d0ca
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_ACTION_NOTIFY_H
+#define LTTNG_ACTION_NOTIFY_H
+
+struct lttng_action;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern struct lttng_action *lttng_action_notify_create(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_ACTION_NOTIFY_H */
diff --git a/include/lttng/condition/buffer-usage-internal.h b/include/lttng/condition/buffer-usage-internal.h
new file mode 100644 (file)
index 0000000..1052c48
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H
+#define LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H
+
+#include <lttng/condition/buffer-usage.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/evaluation-internal.h>
+#include <lttng/domain.h>
+
+struct lttng_condition_buffer_usage {
+       struct lttng_condition parent;
+       struct {
+               bool set;
+               uint64_t value;
+       } threshold_bytes;
+       struct {
+               bool set;
+               double value;
+       } threshold_ratio;
+       char *session_name;
+       char *channel_name;
+       struct {
+               bool set;
+               enum lttng_domain_type type;
+       } domain;
+};
+
+struct lttng_condition_buffer_usage_comm {
+       uint8_t threshold_set_in_bytes;
+       /*
+        * Expressed in bytes if "threshold_set_in_bytes" is not 0.
+        * Otherwise, it is expressed a ratio in the interval [0.0, 1.0]
+        * that is mapped to the range on a 32-bit unsigned integer.
+        * The ratio is obtained by (threshold / UINT32_MAX).
+        */
+       uint32_t threshold;
+       /* Both lengths include the trailing \0. */
+       uint32_t session_name_len;
+       uint32_t channel_name_len;
+       /* enum lttng_domain_type */
+       int8_t domain_type;
+       /* session and channel names. */
+       char names[];
+} LTTNG_PACKED;
+
+struct lttng_evaluation_buffer_usage {
+       struct lttng_evaluation parent;
+       uint64_t buffer_use;
+       uint64_t buffer_capacity;
+};
+
+struct lttng_evaluation_buffer_usage_comm {
+       uint64_t buffer_use;
+       uint64_t buffer_capacity;
+} LTTNG_PACKED;
+
+LTTNG_HIDDEN
+struct lttng_evaluation *lttng_evaluation_buffer_usage_create(
+               enum lttng_condition_type type, uint64_t use,
+               uint64_t capacity);
+
+/*
+ * Applies to all below:
+ *
+ * FIXME Add explicit buffer bound checking using a "len" parameter to
+ * ensure malformed buffers are caught and rejected.
+ */
+LTTNG_HIDDEN
+ssize_t lttng_condition_buffer_usage_low_create_from_buffer(const char *buf,
+               struct lttng_condition **condition);
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_buffer_usage_high_create_from_buffer(const char *buf,
+               struct lttng_condition **condition);
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_buffer_usage_low_create_from_buffer(const char *buf,
+               struct lttng_evaluation **evaluation);
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_buffer_usage_high_create_from_buffer(const char *buf,
+               struct lttng_evaluation **evaluation);
+
+
+
+#endif /* LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H */
diff --git a/include/lttng/condition/buffer-usage.h b/include/lttng/condition/buffer-usage.h
new file mode 100644 (file)
index 0000000..3f5e4d8
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_CONDITION_BUFFER_USAGE_H
+#define LTTNG_CONDITION_BUFFER_USAGE_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct lttng_condition;
+struct lttng_evaluation;
+
+extern struct lttng_condition *
+lttng_condition_buffer_usage_low_create(void);
+
+extern struct lttng_condition *
+lttng_condition_buffer_usage_high_create(void);
+
+/* threshold_ratio expressed as [0.0, 1.0]. */
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_get_threshold_ratio(
+               struct lttng_condition *condition,
+               double *threshold_ratio);
+
+/* threshold_ratio expressed as [0.0, 1.0]. */
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_set_threshold_ratio(
+               struct lttng_condition *condition,
+               double threshold_ratio);
+
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_get_threshold(
+               struct lttng_condition *condition,
+               uint64_t *threshold_bytes);
+
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_set_threshold(
+               struct lttng_condition *condition,
+               uint64_t threshold_bytes);
+
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_get_session_name(
+               struct lttng_condition *condition,
+               const char **session_name);
+
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_set_session_name(
+               struct lttng_condition *condition,
+               const char *session_name);
+
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_get_channel_name(
+               struct lttng_condition *condition,
+               const char **channel_name);
+
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_set_channel_name(
+               struct lttng_condition *condition,
+               const char *channel_name);
+
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_get_domain_type(
+               struct lttng_condition *condition,
+               enum lttng_domain_type *type);
+
+extern enum lttng_condition_status
+lttng_condition_buffer_usage_set_domain_type(
+               struct lttng_condition *condition,
+               enum lttng_domain_type type);
+
+
+/* LTTng Condition Evaluation */
+extern enum lttng_evaluation_status
+lttng_evaluation_buffer_usage_get_usage_ratio(
+               struct lttng_evaluation *evaluation,
+               double *usage_ratio);
+
+extern enum lttng_evaluation_status
+lttng_evaluation_buffer_usage_get_usage(
+               struct lttng_evaluation *evaluation,
+               uint64_t *usage_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_CONDITION_BUFFER_USAGE_H */
diff --git a/include/lttng/condition/condition-internal.h b/include/lttng/condition/condition-internal.h
new file mode 100644 (file)
index 0000000..66dbf20
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_CONDITION_INTERNAL_H
+#define LTTNG_CONDITION_INTERNAL_H
+
+#include <lttng/condition/condition.h>
+#include <common/macros.h>
+#include <stdbool.h>
+#include <urcu/list.h>
+#include <stdint.h>
+
+typedef void (*condition_destroy_cb)(struct lttng_condition *condition);
+typedef bool (*condition_validate_cb)(struct lttng_condition *condition);
+typedef ssize_t (*condition_serialize_cb)(struct lttng_condition *condition,
+               char *buf);
+typedef bool (*condition_equal_cb)(struct lttng_condition *a,
+               struct lttng_condition *b);
+
+struct lttng_condition {
+       enum lttng_condition_type type;
+       condition_validate_cb validate;
+       condition_serialize_cb serialize;
+       condition_equal_cb equal;
+       condition_destroy_cb destroy;
+};
+
+struct lttng_condition_comm {
+       /* enum lttng_condition_type */
+       int8_t condition_type;
+       char payload[];
+};
+
+LTTNG_HIDDEN
+void lttng_condition_init(struct lttng_condition *condition,
+               enum lttng_condition_type type);
+
+LTTNG_HIDDEN
+bool lttng_condition_validate(struct lttng_condition *condition);
+
+/*
+ * FIXME Add explicit buffer bound checking using a "len" parameter to
+ * ensure malformed buffers are caught and rejected.
+ */
+LTTNG_HIDDEN
+ssize_t lttng_condition_create_from_buffer(const char *buf,
+               struct lttng_condition **condition);
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_serialize(struct lttng_condition *condition, char *buf);
+
+LTTNG_HIDDEN
+bool lttng_condition_is_equal(struct lttng_condition *a,
+               struct lttng_condition *b);
+
+#endif /* LTTNG_CONDITION_INTERNAL_H */
diff --git a/include/lttng/condition/condition.h b/include/lttng/condition/condition.h
new file mode 100644 (file)
index 0000000..177b4ab
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_CONDITION_H
+#define LTTNG_CONDITION_H
+
+#include <lttng/lttng.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct lttng_condition;
+
+enum lttng_condition_type {
+       LTTNG_CONDITION_TYPE_UNKNOWN = -1,
+       LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW = 102,
+       LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH = 101,
+};
+
+enum lttng_condition_status {
+       LTTNG_CONDITION_STATUS_OK = 0,
+       LTTNG_CONDITION_STATUS_ERROR = -1,
+       LTTNG_CONDITION_STATUS_UNKNOWN = -2,
+       LTTNG_CONDITION_STATUS_INVALID = -3,
+       LTTNG_CONDITION_STATUS_UNSET = -4,
+};
+
+extern enum lttng_condition_type lttng_condition_get_type(
+               struct lttng_condition *condition);
+
+extern void lttng_condition_destroy(struct lttng_condition *condition);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_CONDITION_H */
diff --git a/include/lttng/condition/evaluation-internal.h b/include/lttng/condition/evaluation-internal.h
new file mode 100644 (file)
index 0000000..5c874b3
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_EVALUATION_INTERNAL_H
+#define LTTNG_EVALUATION_INTERNAL_H
+
+#include <lttng/condition/evaluation.h>
+#include <common/macros.h>
+#include <stdbool.h>
+
+typedef void (*evaluation_destroy_cb)(struct lttng_evaluation *evaluation);
+typedef ssize_t (*evaluation_serialize_cb)(struct lttng_evaluation *evaluation,
+               char *buf);
+
+struct lttng_evaluation_comm {
+       /* enum lttng_condition_type type */
+       int8_t type;
+       char payload[];
+} LTTNG_PACKED;
+
+struct lttng_evaluation {
+       enum lttng_condition_type type;
+       evaluation_serialize_cb serialize;
+       evaluation_destroy_cb destroy;
+};
+
+/*
+ * FIXME Add explicit buffer bound checking using a "len" parameter to
+ * ensure malformed buffers are caught and rejected.
+ */
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_create_from_buffer(const char *buf,
+               struct lttng_evaluation **evaluation);
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_serialize(struct lttng_evaluation *evaluation,
+               char *buf);
+
+#endif /* LTTNG_EVALUATION_INTERNAL_H */
diff --git a/include/lttng/condition/evaluation.h b/include/lttng/condition/evaluation.h
new file mode 100644 (file)
index 0000000..841284c
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_EVALUATION_H
+#define LTTNG_EVALUATION_H
+
+#include <lttng/condition/condition.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct lttng_evaluation;
+
+enum lttng_evaluation_status {
+       LTTNG_EVALUATION_STATUS_OK = 0,
+       LTTNG_EVALUATION_STATUS_ERROR = -1,
+       LTTNG_EVALUATION_STATUS_INVALID = -2,
+       LTTNG_EVALUATION_STATUS_UNKNOWN = -2,
+       LTTNG_EVALUATION_STATUS_UNSET = -3,
+};
+
+extern enum lttng_condition_type lttng_evaluation_get_type(
+               struct lttng_evaluation *evaluation);
+
+extern void lttng_evaluation_destroy(struct lttng_evaluation *evaluation);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_EVALUATION_H */
diff --git a/include/lttng/endpoint-internal.h b/include/lttng/endpoint-internal.h
new file mode 100644 (file)
index 0000000..de6024b
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_ENDPOINT_INTERNAL_H
+#define LTTNG_ENDPOINT_INTERNAL_H
+
+#include <lttng/endpoint.h>
+#include <common/macros.h>
+
+enum lttng_endpoint_type {
+       LTTNG_ENDPOINT_TYPE_DEFAULT_SESSIOND_NOTIFICATION = 0,
+};
+
+struct lttng_endpoint {
+       enum lttng_endpoint_type type;
+};
+
+#endif /* LTTNG_ENDPOINT_INTERNAL_H */
diff --git a/include/lttng/endpoint.h b/include/lttng/endpoint.h
new file mode 100644 (file)
index 0000000..b93ed69
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_ENDPOINT_H
+#define LTTNG_ENDPOINT_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Default LTTng session daemon endpoint singleton. */
+extern struct lttng_endpoint *lttng_session_daemon_notification_endpoint;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_ENDPOINT_H */
index db6fe73c27741c2b269063519adda358bd91fea9..1b5ea699af63ef066e3b6ed7582f7d03a5eda13a 100644 (file)
@@ -145,6 +145,10 @@ enum lttng_error_code {
        LTTNG_ERR_REGEN_STATEDUMP_FAIL   = 122, /* Failed to regenerate the state dump */
        LTTNG_ERR_REGEN_STATEDUMP_NOMEM  = 123, /* Failed to regenerate the state dump, not enough memory */
        LTTNG_ERR_NOT_SNAPSHOT_SESSION   = 124, /* Session is not in snapshot mode. */
+       LTTNG_ERR_INVALID_TRIGGER        = 125, /* Invalid trigger provided. */
+       LTTNG_ERR_TRIGGER_EXISTS         = 126, /* Trigger already registered. */
+       LTTNG_ERR_TRIGGER_NOT_FOUND      = 127, /* Trigger not found. */
+       LTTNG_ERR_COMMAND_CANCELLED      = 128, /* Command cancelled. */
 
        /* MUST be last element */
        LTTNG_ERR_NR,                           /* Last element */
diff --git a/include/lttng/notification/channel-internal.h b/include/lttng/notification/channel-internal.h
new file mode 100644 (file)
index 0000000..e684a96
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H
+#define LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H
+
+#include <lttng/notification/channel.h>
+#include <common/macros.h>
+#include <stdint.h>
+#include <stdbool.h>
+
+enum lttng_notification_channel_message_type {
+       LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE = 0,
+       LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE = 1,
+       LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY = 2,
+       LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION = 3,
+};
+
+struct lttng_notification_channel_message {
+       /* enum lttng_notification_channel_message_type */
+       int8_t type;
+       /* size of the payload following this field */
+       uint32_t size;
+       char payload[];
+} LTTNG_PACKED;
+
+struct lttng_notification_channel_command_reply {
+       /* enum lttng_notification_channel_status */
+       int8_t status;
+} LTTNG_PACKED;
+
+struct lttng_notification_channel {
+       /* FIXME Add mutex to protect the socket from concurrent uses. */
+       int socket;
+};
+
+#endif /* LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H */
diff --git a/include/lttng/notification/channel.h b/include/lttng/notification/channel.h
new file mode 100644 (file)
index 0000000..36b3509
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_NOTIFICATION_CHANNEL_H
+#define LTTNG_NOTIFICATION_CHANNEL_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct lttng_endpoint;
+struct lttng_condition;
+struct lttng_notification;
+struct lttng_notification_channel;
+
+/* LTTng Notification channel */
+enum lttng_notification_channel_status {
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED = 1,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK = 0,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR = -1,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED = -2,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED = -3,
+       /* Condition unknown. */
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION = -4,
+       LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID = -5,
+};
+
+extern struct lttng_notification_channel *lttng_notification_channel_create(
+               struct lttng_endpoint *endpoint);
+
+extern enum lttng_notification_channel_status
+lttng_notification_channel_get_next_notification(
+               struct lttng_notification_channel *channel,
+               struct lttng_notification **notification);
+
+extern enum lttng_notification_channel_status
+lttng_notification_channel_subscribe(
+               struct lttng_notification_channel *channel,
+               struct lttng_condition *condition);
+
+extern enum lttng_notification_channel_status
+lttng_notification_channel_unsubscribe(
+               struct lttng_notification_channel *channel,
+               struct lttng_condition *condition);
+
+extern void lttng_notification_channel_destroy(
+               struct lttng_notification_channel *channel);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_NOTIFICATION_CHANNEL_H */
diff --git a/include/lttng/notification/notification-internal.h b/include/lttng/notification/notification-internal.h
new file mode 100644 (file)
index 0000000..d59e2e6
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_NOTIFICATION_INTERNAL_H
+#define LTTNG_NOTIFICATION_INTERNAL_H
+
+#include <lttng/notification/notification.h>
+#include <common/macros.h>
+#include <stdint.h>
+#include <stdbool.h>
+
+struct lttng_notification {
+       struct lttng_condition *condition;
+       struct lttng_evaluation *evaluation;
+       bool owns_elements;
+};
+
+struct lttng_notification_comm {
+       /* length excludes its own length. */
+       uint32_t length;
+       /* A condition and evaluation object follow. */
+       char payload[];
+} LTTNG_PACKED;
+
+LTTNG_HIDDEN
+struct lttng_notification *lttng_notification_create(
+               struct lttng_condition *condition,
+               struct lttng_evaluation *evaluation);
+
+LTTNG_HIDDEN
+ssize_t lttng_notification_serialize(struct lttng_notification *notification,
+               char *buf);
+
+/*
+ * FIXME Add explicit buffer bound checking using a "len" parameter to
+ * ensure malformed buffers are caught and rejected.
+ */
+LTTNG_HIDDEN
+ssize_t lttng_notification_create_from_buffer(const char *buf,
+               struct lttng_notification **notification);
+
+#endif /* LTTNG_NOTIFICATION_INTERNAL_H */
diff --git a/include/lttng/notification/notification.h b/include/lttng/notification/notification.h
new file mode 100644 (file)
index 0000000..100d0de
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_NOTIFICATION_H
+#define LTTNG_NOTIFICATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct lttng_condition;
+struct lttng_evaluation;
+struct lttng_notification;
+
+/*
+ * The notification retains ownership of both the condition and evaluation.
+ * Destroying the notification will also destroy the notification and evaluation
+ * objects.
+ */
+extern struct lttng_condition *lttng_notification_get_condition(
+               struct lttng_notification *notification);
+
+extern struct lttng_evaluation *lttng_notification_get_evaluation(
+               struct lttng_notification *notification);
+
+extern void lttng_notification_destroy(struct lttng_notification *notification);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_NOTIFICATION_H */
diff --git a/include/lttng/trigger/trigger-internal.h b/include/lttng/trigger/trigger-internal.h
new file mode 100644 (file)
index 0000000..f84a334
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_TRIGGER_INTERNAL_H
+#define LTTNG_TRIGGER_INTERNAL_H
+
+#include <lttng/trigger/trigger.h>
+#include <common/macros.h>
+#include <stdint.h>
+#include <stdbool.h>
+
+struct lttng_trigger {
+       struct lttng_condition *condition;
+       struct lttng_action *action;
+};
+
+struct lttng_trigger_comm {
+       /* length excludes its own length. */
+       uint32_t length;
+       /* A condition and action object follow. */
+       char payload[];
+} LTTNG_PACKED;
+
+/*
+ * FIXME Add explicit buffer bound checking using a "len" parameter to
+ * ensure malformed buffers are caught and rejected.
+ */
+LTTNG_HIDDEN
+ssize_t lttng_trigger_create_from_buffer(const char *buf,
+               struct lttng_trigger **trigger);
+
+LTTNG_HIDDEN
+ssize_t lttng_trigger_serialize(struct lttng_trigger *trigger, char *buf);
+
+LTTNG_HIDDEN
+bool lttng_trigger_validate(struct lttng_trigger *trigger);
+
+#endif /* LTTNG_TRIGGER_INTERNAL_H */
diff --git a/include/lttng/trigger/trigger.h b/include/lttng/trigger/trigger.h
new file mode 100644 (file)
index 0000000..3d7dd45
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_TRIGGER_H
+#define LTTNG_TRIGGER_H
+
+struct lttng_action;
+struct lttng_condition;
+struct lttng_trigger;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum lttng_register_trigger_status {
+       LTTNG_REGISTER_TRIGGER_STATUS_OK = 0,
+       LTTNG_REGISTER_TRIGGER_STATUS_INVALID = -1,
+};
+
+/* Trigger assumes ownership of both condition and action after call. */
+extern struct lttng_trigger *lttng_trigger_create(
+               struct lttng_condition *condition, struct lttng_action *action);
+
+extern struct lttng_condition *lttng_trigger_get_condition(
+               struct lttng_trigger *trigger);
+
+extern struct lttng_action *lttng_trigger_get_action(
+               struct lttng_trigger *trigger);
+
+extern void lttng_trigger_destroy(struct lttng_trigger *trigger);
+
+extern int lttng_register_trigger(struct lttng_trigger *trigger);
+
+extern int lttng_unregister_trigger(struct lttng_trigger *trigger);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_TRIGGER_H */
index 2f8eed1085dfed209ba5f00979621e2f603d44be..9fb4747536ad7fb3551217f1f3d6aaa0198caae6 100644 (file)
@@ -538,7 +538,7 @@ int main(int argc, char **argv)
                goto exit_data_thread;
        }
 
-       /* Create the thread to manage the receive of fd */
+       /* Create the thread to manage the reception of fds */
        ret = pthread_create(&sessiond_thread, default_pthread_attr(),
                        consumer_thread_sessiond_poll,
                        (void *) ctx);
@@ -583,6 +583,14 @@ exit_metadata_timer_thread:
                PERROR("pthread_join sessiond_thread");
                retval = -1;
        }
+
+       ret = consumer_timer_thread_get_channel_monitor_pipe();
+       if (ret >= 0) {
+               ret = close(ret);
+               if (ret) {
+                       PERROR("close channel monitor pipe");
+               }
+       }
 exit_sessiond_thread:
 
        ret = pthread_join(data_thread, &status);
index 4238661397275622c9cf79a6c8a9b1d6982a9894..0ac7506b440dc825151c32db1bbeca862c70613c 100644 (file)
@@ -30,7 +30,10 @@ lttng_sessiond_SOURCES = utils.c utils.h \
                        agent.c agent.h \
                        save.h save.c \
                        load-session-thread.h load-session-thread.c \
-                       syscall.h syscall.c
+                       syscall.h syscall.c \
+                       notification-thread.h notification-thread.c \
+                       notification-thread-commands.h notification-thread-commands.c \
+                       notification-thread-events.h notification-thread-events.c
 
 if HAVE_LIBLTTNG_UST_CTL
 lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \
index 39d2ec4a05bd1297c31e2ac7036600772ea38ffa..2439b3e28ec0fe73c54c08be3d32b3c7c70af005 100644 (file)
@@ -252,7 +252,7 @@ void *agent_thread_manage_registration(void *data)
                goto error_tcp_socket;
        }
 
-       /* Add create valid TCP socket to poll set. */
+       /* Add TCP socket to poll set. */
        ret = lttng_poll_add(&events, reg_sock->fd,
                        LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
        if (ret < 0) {
index cc81906b6267ffc104ee357741d0b099af9fbc3c..5c9e69ce10bdcd1aa4967c4d7662c993271d5f89 100644 (file)
@@ -29,6 +29,7 @@
 #include <common/utils.h>
 #include <common/compat/string.h>
 #include <common/kernel-ctl/kernel-ctl.h>
+#include <lttng/trigger/trigger-internal.h>
 
 #include "channel.h"
 #include "consumer.h"
@@ -41,6 +42,8 @@
 #include "syscall.h"
 #include "agent.h"
 #include "buffer-registry.h"
+#include "notification-thread.h"
+#include "notification-thread-commands.h"
 
 #include "cmd.h"
 
@@ -3566,6 +3569,84 @@ end:
        return ret;
 }
 
+int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock,
+               struct notification_thread_handle *notification_thread)
+{
+       int ret;
+       size_t trigger_len;
+       ssize_t sock_recv_len;
+       char *trigger_buffer = NULL;
+       struct lttng_trigger *trigger = NULL;
+
+       trigger_len = (size_t) cmd_ctx->lsm->u.trigger.length;
+       trigger_buffer = zmalloc(trigger_len);
+       if (!trigger_buffer) {
+               ret = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       sock_recv_len = lttcomm_recv_unix_sock(sock, trigger_buffer,
+                       trigger_len);
+       if (sock_recv_len < 0 || sock_recv_len != trigger_len) {
+               ERR("Failed to receive \"register trigger\" command payload");
+               /* TODO: should this be a new error enum ? */
+               ret = LTTNG_ERR_INVALID_TRIGGER;
+               goto end;
+       }
+
+       if (lttng_trigger_create_from_buffer(trigger_buffer, &trigger) !=
+                       trigger_len) {
+               ERR("Invalid trigger payload received in \"register trigger\" command");
+               ret = LTTNG_ERR_INVALID_TRIGGER;
+               goto end;
+       }
+
+       ret = notification_thread_command_register_trigger(notification_thread,
+                       trigger);
+end:
+       free(trigger_buffer);
+       return ret;
+}
+
+int cmd_unregister_trigger(struct command_ctx *cmd_ctx, int sock,
+               struct notification_thread_handle *notification_thread)
+{
+       int ret;
+       size_t trigger_len;
+       ssize_t sock_recv_len;
+       char *trigger_buffer = NULL;
+       struct lttng_trigger *trigger = NULL;
+
+       trigger_len = (size_t) cmd_ctx->lsm->u.trigger.length;
+       trigger_buffer = zmalloc(trigger_len);
+       if (!trigger_buffer) {
+               ret = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       sock_recv_len = lttcomm_recv_unix_sock(sock, trigger_buffer,
+                       trigger_len);
+       if (sock_recv_len < 0 || sock_recv_len != trigger_len) {
+               ERR("Failed to receive \"register trigger\" command payload");
+               /* TODO: should this be a new error enum ? */
+               ret = LTTNG_ERR_INVALID_TRIGGER;
+               goto end;
+       }
+
+       if (lttng_trigger_create_from_buffer(trigger_buffer, &trigger) !=
+                       trigger_len) {
+               ERR("Invalid trigger payload received in \"register trigger\" command");
+               ret = LTTNG_ERR_INVALID_TRIGGER;
+               goto end;
+       }
+
+       ret = notification_thread_command_unregister_trigger(notification_thread,
+                       trigger);
+end:
+       free(trigger_buffer);
+       return ret;
+}
+
 /*
  * Send relayd sockets from snapshot output to consumer. Ignore request if the
  * snapshot output is *not* set with a remote destination.
index ac88d51308dc6270337f81e48cc3288cbb155210..e7e3442761a137739af8b792f7cab01be6d9a485 100644 (file)
@@ -21,6 +21,8 @@
 #include "context.h"
 #include "session.h"
 
+struct notification_thread_handle;
+
 /*
  * Init the command subsystem. Must be called before using any of the functions
  * above. This is called in the main() of the session daemon.
@@ -111,4 +113,9 @@ int cmd_set_session_shm_path(struct ltt_session *session,
 int cmd_regenerate_metadata(struct ltt_session *session);
 int cmd_regenerate_statedump(struct ltt_session *session);
 
+int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock,
+               struct notification_thread_handle *notification_thread_handle);
+int cmd_unregister_trigger(struct command_ctx *cmd_ctx, int sock,
+               struct notification_thread_handle *notification_thread_handle);
+
 #endif /* CMD_H */
index 6ee3975792bb061b46035d6d7b5923af1ab6fd0f..8c8116d8e70a4652b5b8b0edc4efd67b26a8fb74 100644 (file)
@@ -748,7 +748,6 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
        }
 
        ret = consumer_recv_status_reply(sock);
-
 error:
        return ret;
 }
@@ -806,6 +805,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                unsigned int switch_timer_interval,
                unsigned int read_timer_interval,
                unsigned int live_timer_interval,
+               unsigned int monitor_timer_interval,
                int output,
                int type,
                uint64_t session_id,
@@ -837,6 +837,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
        msg->u.ask_channel.read_timer_interval = read_timer_interval;
        msg->u.ask_channel.live_timer_interval = live_timer_interval;
+       msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval;
        msg->u.ask_channel.output = output;
        msg->u.ask_channel.type = type;
        msg->u.ask_channel.session_id = session_id;
@@ -1048,6 +1049,35 @@ error:
        return ret;
 }
 
+int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
+               int pipe)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       /* Code flow error. Safety net. */
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE;
+
+       DBG3("Sending set_channel_monitor_pipe command to consumer");
+       ret = consumer_send_msg(consumer_sock, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+       DBG3("Sending channel monitoring pipe %d to consumer on socket %d",
+                       pipe, *consumer_sock->fd_ptr);
+       ret = consumer_send_fds(consumer_sock, &pipe, 1);
+       if (ret < 0) {
+               goto error;
+       }
+
+       DBG2("Channel monitoring pipe successfully sent");
+error:
+       return ret;
+}
+
 /*
  * Set consumer subdirectory using the session name and a generated datetime if
  * needed. This is appended to the current subdirectory.
index 08b57eb73b6a7e932e57a8b1dae0ffea2a9bfc8f..0cc3d0e41d96e329bbf2c8b5a3b96f251d9184fa 100644 (file)
@@ -93,6 +93,11 @@ struct consumer_data {
        int err_sock;
        /* These two sockets uses the cmd_unix_sock_path. */
        int cmd_sock;
+       /*
+        * Write-end of the channel monitoring pipe to be passed to the
+        * consumer.
+        */
+       int channel_monitor_pipe;
        /*
         * The metadata socket object is handled differently and only created
         * locally in this object thus it's the only reference available in the
@@ -214,6 +219,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
                enum lttng_stream_type type, uint64_t session_id,
                char *session_name, char *hostname, int session_live_timer);
+int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
+               int pipe);
 int consumer_send_destroy_relayd(struct consumer_socket *sock,
                struct consumer_output *consumer);
 int consumer_recv_status_reply(struct consumer_socket *sock);
@@ -232,6 +239,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                unsigned int switch_timer_interval,
                unsigned int read_timer_interval,
                unsigned int live_timer_interval,
+               unsigned int monitor_timer_interval,
                int output,
                int type,
                uint64_t session_id,
index 22ea1bb3e6de87635b5623c8fff8c5dc6163531a..5d94cc639e2e77476dba2502db5b58e222b5491a 100644 (file)
@@ -29,6 +29,7 @@ enum health_type_sessiond {
        HEALTH_SESSIOND_TYPE_HT_CLEANUP         = 5,
        HEALTH_SESSIOND_TYPE_APP_MANAGE_NOTIFY  = 6,
        HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH   = 7,
+       HEALTH_SESSIOND_TYPE_NOTIFICATION       = 8,
 
        NR_HEALTH_SESSIOND_TYPES,
 };
index 65ab37d59ec4ec5026cb72dc5bea18a7d5004dd0..1988d1fccad6da714e5d7884205b7310c4e5a51d 100644 (file)
@@ -29,6 +29,7 @@
 #include "session.h"
 #include "ust-app.h"
 #include "version.h"
+#include "notification-thread.h"
 
 extern const char default_home_dir[],
        default_tracing_group[],
@@ -38,6 +39,8 @@ extern const char default_home_dir[],
 /* Set in main.c at boot time of the daemon */
 extern int kernel_tracer_fd;
 
+extern struct notification_thread_handle *notification_thread_handle;
+
 /*
  * This contains extra data needed for processing a command received by the
  * session daemon from the lttng client.
@@ -58,7 +61,7 @@ struct ust_command {
 };
 
 /*
- * Queue used to enqueue UST registration request (ust_commant) and protected
+ * Queue used to enqueue UST registration request (ust_command) and synchronized
  * by a futex with a scheme N wakers / 1 waiters. See futex.c/.h
  */
 struct ust_cmd_queue {
index a812ee857c236d8734a71db667c945b5ca89268e..cba0e272d6a37a47d9e14b7893ef8aab0f0f861e 100644 (file)
@@ -215,6 +215,7 @@ int ustctl_put_next_subbuf(struct ustctl_consumer_stream *stream);
 /* snapshot */
 
 int ustctl_snapshot(struct ustctl_consumer_stream *stream);
+int ustctl_snapshot_sample_positions(struct ustctl_consumer_stream *stream);
 int ustctl_snapshot_get_consumed(struct ustctl_consumer_stream *stream,
                unsigned long *pos);
 int ustctl_snapshot_get_produced(struct ustctl_consumer_stream *stream,
index 172f8c270d5b633466ace52d70fdcf3f3f061318..f137c09f89f53e1aaf0caec5a3fa3415e5b2efe0 100644 (file)
@@ -71,6 +71,7 @@
 #include "agent-thread.h"
 #include "save.h"
 #include "load-session-thread.h"
+#include "notification-thread.h"
 #include "syscall.h"
 #include "agent.h"
 #include "ht-cleanup.h"
@@ -104,6 +105,7 @@ static struct consumer_data kconsumer_data = {
        .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH,
        .err_sock = -1,
        .cmd_sock = -1,
+       .channel_monitor_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -115,6 +117,7 @@ static struct consumer_data ustconsumer64_data = {
        .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH,
        .err_sock = -1,
        .cmd_sock = -1,
+       .channel_monitor_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -126,6 +129,7 @@ static struct consumer_data ustconsumer32_data = {
        .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH,
        .err_sock = -1,
        .cmd_sock = -1,
+       .channel_monitor_pipe = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -211,6 +215,7 @@ static pthread_t health_thread;
 static pthread_t ht_cleanup_thread;
 static pthread_t agent_reg_thread;
 static pthread_t load_session_thread;
+static pthread_t notification_thread;
 
 /*
  * UST registration command queue. This queue is tied with a futex and uses a N
@@ -305,15 +310,19 @@ const char * const config_section_name = "sessiond";
 /* Load session thread information to operate. */
 struct load_session_thread_data *load_info;
 
+/* Notification thread handle. */
+struct notification_thread_handle *notification_thread_handle;
+
 /* Global hash tables */
 struct lttng_ht *agent_apps_ht_by_sock = NULL;
 
 /*
- * Whether sessiond is ready for commands/health check requests.
+ * Whether sessiond is ready for commands/notification channel/health check
+ * requests.
  * NR_LTTNG_SESSIOND_READY must match the number of calls to
  * sessiond_notify_ready().
  */
-#define NR_LTTNG_SESSIOND_READY                3
+#define NR_LTTNG_SESSIOND_READY                4
 int lttng_sessiond_ready = NR_LTTNG_SESSIOND_READY;
 
 int sessiond_check_thread_quit_pipe(int fd, uint32_t events)
@@ -521,6 +530,24 @@ static void close_consumer_sockets(void)
                        PERROR("UST consumerd64 cmd_sock close");
                }
        }
+       if (kconsumer_data.channel_monitor_pipe >= 0) {
+               ret = close(kconsumer_data.channel_monitor_pipe);
+               if (ret < 0) {
+                       PERROR("kernel consumer channel monitor pipe close");
+               }
+       }
+       if (ustconsumer32_data.channel_monitor_pipe >= 0) {
+               ret = close(ustconsumer32_data.channel_monitor_pipe);
+               if (ret < 0) {
+                       PERROR("UST consumerd32 channel monitor pipe close");
+               }
+       }
+       if (ustconsumer64_data.channel_monitor_pipe >= 0) {
+               ret = close(ustconsumer64_data.channel_monitor_pipe);
+               if (ret < 0) {
+                       PERROR("UST consumerd64 channel monitor pipe close");
+               }
+       }
 }
 
 /*
@@ -697,6 +724,10 @@ static void sessiond_cleanup(void)
                free(load_info);
        }
 
+       if (notification_thread_handle) {
+               notification_thread_handle_destroy(notification_thread_handle);
+       }
+
        /*
         * Cleanup lock file by deleting it and finaly closing it which will
         * release the file system lock.
@@ -1224,6 +1255,7 @@ static void *thread_manage_consumer(void *data)
        enum lttcomm_return_code code;
        struct lttng_poll_event events;
        struct consumer_data *consumer_data = data;
+       struct consumer_socket *cmd_socket_wrapper = NULL;
 
        DBG("[thread] Manage consumer started");
 
@@ -1333,39 +1365,43 @@ restart:
        }
 
        health_code_update();
-       if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
-               /* Connect both socket, command and metadata. */
-               consumer_data->cmd_sock =
-                       lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
-               consumer_data->metadata_fd =
-                       lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
-               if (consumer_data->cmd_sock < 0
-                               || consumer_data->metadata_fd < 0) {
-                       PERROR("consumer connect cmd socket");
-                       /* On error, signal condition and quit. */
-                       signal_consumer_condition(consumer_data, -1);
-                       goto error;
-               }
-               consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
-               /* Create metadata socket lock. */
-               consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
-               if (consumer_data->metadata_sock.lock == NULL) {
-                       PERROR("zmalloc pthread mutex");
-                       goto error;
-               }
-               pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
-
-               signal_consumer_condition(consumer_data, 1);
-               DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
-               DBG("Consumer metadata socket ready (fd: %d)",
-                               consumer_data->metadata_fd);
-       } else {
+       if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
                ERR("consumer error when waiting for SOCK_READY : %s",
                                lttcomm_get_readable_code(-code));
                goto error;
        }
 
-       /* Remove the consumerd error sock since we've established a connexion */
+       /* Connect both command and metadata sockets. */
+       consumer_data->cmd_sock =
+                       lttcomm_connect_unix_sock(
+                               consumer_data->cmd_unix_sock_path);
+       consumer_data->metadata_fd =
+                       lttcomm_connect_unix_sock(
+                               consumer_data->cmd_unix_sock_path);
+       if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
+               PERROR("consumer connect cmd socket");
+               /* On error, signal condition and quit. */
+               signal_consumer_condition(consumer_data, -1);
+               goto error;
+       }
+
+       consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
+
+       /* Create metadata socket lock. */
+       consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
+       if (consumer_data->metadata_sock.lock == NULL) {
+               PERROR("zmalloc pthread mutex");
+               goto error;
+       }
+       pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
+
+       DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
+       DBG("Consumer metadata socket ready (fd: %d)",
+                       consumer_data->metadata_fd);
+
+       /*
+        * Remove the consumerd error sock since we've established a connection.
+        */
        ret = lttng_poll_del(&events, consumer_data->err_sock);
        if (ret < 0) {
                goto error;
@@ -1386,6 +1422,27 @@ restart:
 
        health_code_update();
 
+       /*
+        * Transfer the write-end of the channel monitoring pipe to the
+        * by issuing a SET_CHANNEL_MONITOR_PIPE command.
+        */
+       cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
+       if (!cmd_socket_wrapper) {
+               goto error;
+       }
+
+       ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
+                       consumer_data->channel_monitor_pipe);
+       if (ret) {
+               goto error;
+       }
+       /* Discard the socket wrapper as it is no longer needed. */
+       consumer_destroy_socket(cmd_socket_wrapper);
+       cmd_socket_wrapper = NULL;
+
+       /* The thread is completely initialized, signal that it is ready. */
+       signal_consumer_condition(consumer_data, 1);
+
        /* Infinite blocking call, waiting for transmission */
 restart_poll:
        while (1) {
@@ -1529,6 +1586,10 @@ error:
                free(consumer_data->metadata_sock.lock);
        }
        lttng_poll_clean(&events);
+
+       if (cmd_socket_wrapper) {
+               consumer_destroy_socket(cmd_socket_wrapper);
+       }
 error_poll:
        if (err) {
                health_error();
@@ -3004,6 +3065,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
        case LTTNG_SET_SESSION_SHM_PATH:
        case LTTNG_REGENERATE_METADATA:
        case LTTNG_REGENERATE_STATEDUMP:
+       case LTTNG_REGISTER_TRIGGER:
+       case LTTNG_UNREGISTER_TRIGGER:
                need_domain = 0;
                break;
        default:
@@ -3066,6 +3129,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
        case LTTNG_LIST_SYSCALLS:
        case LTTNG_LIST_TRACEPOINT_FIELDS:
        case LTTNG_SAVE_SESSION:
+       case LTTNG_REGISTER_TRIGGER:
+       case LTTNG_UNREGISTER_TRIGGER:
                need_tracing_session = 0;
                break;
        default:
@@ -4117,6 +4182,18 @@ error_add_context:
                ret = cmd_regenerate_statedump(cmd_ctx->session);
                break;
        }
+       case LTTNG_REGISTER_TRIGGER:
+       {
+               ret = cmd_register_trigger(cmd_ctx, sock,
+                               notification_thread_handle);
+               break;
+       }
+       case LTTNG_UNREGISTER_TRIGGER:
+       {
+               ret = cmd_unregister_trigger(cmd_ctx, sock,
+                               notification_thread_handle);
+               break;
+       }
        default:
                ret = LTTNG_ERR_UND;
                break;
@@ -5531,6 +5608,9 @@ int main(int argc, char **argv)
        int ret = 0, retval = 0;
        void *status;
        const char *home_path, *env_app_timeout;
+       struct lttng_pipe *ust32_channel_monitor_pipe = NULL,
+                       *ust64_channel_monitor_pipe = NULL,
+                       *kernel_channel_monitor_pipe = NULL;
 
        init_kernel_workarounds();
 
@@ -5689,6 +5769,19 @@ int main(int argc, char **argv)
                                kconsumer_data.err_unix_sock_path);
                DBG2("Kernel consumer cmd path: %s",
                                kconsumer_data.cmd_unix_sock_path);
+               kernel_channel_monitor_pipe = lttng_pipe_open(0);
+               if (!kernel_channel_monitor_pipe) {
+                       ERR("Failed to create kernel consumer channel monitor pipe");
+                       retval = -1;
+                       goto exit_init_data;
+               }
+               kconsumer_data.channel_monitor_pipe =
+                               lttng_pipe_release_writefd(
+                                       kernel_channel_monitor_pipe);
+               if (kconsumer_data.channel_monitor_pipe < 0) {
+                       retval = -1;
+                       goto exit_init_data;
+               }
        } else {
                home_path = utils_get_home_dir();
                if (home_path == NULL) {
@@ -5793,6 +5886,18 @@ int main(int argc, char **argv)
                        ustconsumer32_data.err_unix_sock_path);
        DBG2("UST consumer 32 bits cmd path: %s",
                        ustconsumer32_data.cmd_unix_sock_path);
+       ust32_channel_monitor_pipe = lttng_pipe_open(0);
+       if (!ust32_channel_monitor_pipe) {
+               ERR("Failed to create 32-bit user space consumer channel monitor pipe");
+               retval = -1;
+               goto exit_init_data;
+       }
+       ustconsumer32_data.channel_monitor_pipe = lttng_pipe_release_writefd(
+                       ust32_channel_monitor_pipe);
+       if (ustconsumer32_data.channel_monitor_pipe < 0) {
+               retval = -1;
+               goto exit_init_data;
+       }
 
        /* 64 bits consumerd path setup */
        ret = snprintf(ustconsumer64_data.err_unix_sock_path, PATH_MAX,
@@ -5814,6 +5919,18 @@ int main(int argc, char **argv)
                        ustconsumer64_data.err_unix_sock_path);
        DBG2("UST consumer 64 bits cmd path: %s",
                        ustconsumer64_data.cmd_unix_sock_path);
+       ust64_channel_monitor_pipe = lttng_pipe_open(0);
+       if (!ust64_channel_monitor_pipe) {
+               ERR("Failed to create 64-bit user space consumer channel monitor pipe");
+               retval = -1;
+               goto exit_init_data;
+       }
+       ustconsumer64_data.channel_monitor_pipe = lttng_pipe_release_writefd(
+                       ust64_channel_monitor_pipe);
+       if (ustconsumer64_data.channel_monitor_pipe < 0) {
+               retval = -1;
+               goto exit_init_data;
+       }
 
        /*
         * See if daemon already exist.
@@ -5973,7 +6090,7 @@ int main(int argc, char **argv)
        }
        load_info->path = opt_load_session_path;
 
-       /* Create health-check thread */
+       /* Create health-check thread. */
        ret = pthread_create(&health_thread, default_pthread_attr(),
                        thread_manage_health, (void *) NULL);
        if (ret) {
@@ -5983,6 +6100,29 @@ int main(int argc, char **argv)
                goto exit_health;
        }
 
+       /* notification_thread_data acquires the pipes' read side. */
+       notification_thread_handle = notification_thread_handle_create(
+                       ust32_channel_monitor_pipe,
+                       ust64_channel_monitor_pipe,
+                       kernel_channel_monitor_pipe);
+       if (!notification_thread_handle) {
+               retval = -1;
+               ERR("Failed to create notification thread shared data");
+               stop_threads();
+               goto exit_notification;
+       }
+
+       /* Create notification thread. */
+       ret = pthread_create(&notification_thread, default_pthread_attr(),
+                       thread_notification, notification_thread_handle);
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_create notification");
+               retval = -1;
+               stop_threads();
+               goto exit_notification;
+       }
+
        /* Create thread to manage the client socket */
        ret = pthread_create(&client_thread, default_pthread_attr(),
                        thread_manage_clients, (void *) NULL);
@@ -5990,6 +6130,7 @@ int main(int argc, char **argv)
                errno = ret;
                PERROR("pthread_create clients");
                retval = -1;
+               stop_threads();
                goto exit_client;
        }
 
@@ -6000,6 +6141,7 @@ int main(int argc, char **argv)
                errno = ret;
                PERROR("pthread_create dispatch");
                retval = -1;
+               stop_threads();
                goto exit_dispatch;
        }
 
@@ -6010,6 +6152,7 @@ int main(int argc, char **argv)
                errno = ret;
                PERROR("pthread_create registration");
                retval = -1;
+               stop_threads();
                goto exit_reg_apps;
        }
 
@@ -6020,6 +6163,7 @@ int main(int argc, char **argv)
                errno = ret;
                PERROR("pthread_create apps");
                retval = -1;
+               stop_threads();
                goto exit_apps;
        }
 
@@ -6030,6 +6174,7 @@ int main(int argc, char **argv)
                errno = ret;
                PERROR("pthread_create notify");
                retval = -1;
+               stop_threads();
                goto exit_apps_notify;
        }
 
@@ -6040,6 +6185,7 @@ int main(int argc, char **argv)
                errno = ret;
                PERROR("pthread_create agent");
                retval = -1;
+               stop_threads();
                goto exit_agent_reg;
        }
 
@@ -6052,6 +6198,7 @@ int main(int argc, char **argv)
                        errno = ret;
                        PERROR("pthread_create kernel");
                        retval = -1;
+                       stop_threads();
                        goto exit_kernel;
                }
        }
@@ -6063,6 +6210,7 @@ int main(int argc, char **argv)
                errno = ret;
                PERROR("pthread_create load_session_thread");
                retval = -1;
+               stop_threads();
                goto exit_load_session;
        }
 
@@ -6139,16 +6287,24 @@ exit_dispatch:
                PERROR("pthread_join");
                retval = -1;
        }
+
 exit_client:
+       ret = pthread_join(notification_thread, &status);
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_join notification thread");
+               retval = -1;
+       }
 
+exit_notification:
        ret = pthread_join(health_thread, &status);
        if (ret) {
                errno = ret;
                PERROR("pthread_join health thread");
                retval = -1;
        }
-exit_health:
 
+exit_health:
 exit_init_data:
        /*
         * Wait for all pending call_rcu work to complete before tearing
@@ -6176,6 +6332,9 @@ exit_init_data:
        if (ret) {
                retval = -1;
        }
+       lttng_pipe_destroy(ust32_channel_monitor_pipe);
+       lttng_pipe_destroy(ust64_channel_monitor_pipe);
+       lttng_pipe_destroy(kernel_channel_monitor_pipe);
 exit_ht_cleanup:
 
        health_app_destroy(health_sessiond);
@@ -6186,7 +6345,6 @@ exit_options:
        sessiond_cleanup_options();
 
 exit_set_signal_handler:
-
        if (!retval) {
                exit(EXIT_SUCCESS);
        } else {
diff --git a/src/bin/lttng-sessiond/notification-thread-commands.c b/src/bin/lttng-sessiond/notification-thread-commands.c
new file mode 100644 (file)
index 0000000..cfbb862
--- /dev/null
@@ -0,0 +1,168 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <lttng/trigger/trigger.h>
+#include <lttng/lttng-error.h>
+#include "notification-thread.h"
+#include "notification-thread-commands.h"
+#include <common/error.h>
+#include <common/futex.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <inttypes.h>
+
+static
+void init_notification_thread_command(struct notification_thread_command *cmd)
+{
+       memset(cmd, 0, sizeof(*cmd));
+       CDS_INIT_LIST_HEAD(&cmd->cmd_list_node);
+}
+
+static
+int run_command_wait(struct notification_thread_handle *handle,
+               struct notification_thread_command *cmd)
+{
+       int ret;
+       uint64_t notification_counter = 1;
+
+       futex_nto1_prepare(&cmd->reply_futex);
+
+       pthread_mutex_lock(&handle->cmd_queue.lock);
+       /* Add to queue. */
+       cds_list_add_tail(&cmd->cmd_list_node,
+                       &handle->cmd_queue.list);
+       /* Wake-up thread. */
+       ret = write(handle->cmd_queue.event_fd,
+                       &notification_counter, sizeof(notification_counter));
+       if (ret < 0) {
+               PERROR("write to notification thread's queue event fd");
+               /*
+                * Remove the command from the list so the notification
+                * thread does not process it.
+                */
+               cds_list_del(&cmd->cmd_list_node);
+               goto error_unlock_queue;
+       }
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+
+       futex_nto1_wait(&cmd->reply_futex);
+       return 0;
+error_unlock_queue:
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+       return -1;
+}
+
+enum lttng_error_code notification_thread_command_register_trigger(
+               struct notification_thread_handle *handle,
+               struct lttng_trigger *trigger)
+{
+       int ret;
+       enum lttng_error_code ret_code;
+       struct notification_thread_command cmd;
+
+       init_notification_thread_command(&cmd);
+
+       cmd.type = NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER;
+       cmd.parameters.trigger = trigger;
+
+       ret = run_command_wait(handle, &cmd);
+       if (ret) {
+               ret_code = LTTNG_ERR_UNK;
+               goto end;
+       }
+       ret_code = cmd.reply_code;
+end:
+       return ret_code;
+}
+
+enum lttng_error_code notification_thread_command_unregister_trigger(
+               struct notification_thread_handle *handle,
+               struct lttng_trigger *trigger)
+{
+       int ret;
+       enum lttng_error_code ret_code;
+       struct notification_thread_command cmd;
+
+       init_notification_thread_command(&cmd);
+
+       cmd.type = NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER;
+       cmd.parameters.trigger = trigger;
+
+       ret = run_command_wait(handle, &cmd);
+       if (ret) {
+               ret_code = LTTNG_ERR_UNK;
+               goto end;
+       }
+       ret_code = cmd.reply_code;
+end:
+       return ret_code;
+}
+
+enum lttng_error_code notification_thread_command_add_channel(
+               struct notification_thread_handle *handle,
+               char *session_name, uid_t uid, gid_t gid,
+               char *channel_name, uint64_t key,
+               enum lttng_domain_type domain, uint64_t capacity)
+{
+       int ret;
+       enum lttng_error_code ret_code;
+       struct notification_thread_command cmd;
+
+       init_notification_thread_command(&cmd);
+
+       cmd.type = NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL;
+       cmd.parameters.add_channel.session_name = session_name;
+       cmd.parameters.add_channel.uid = uid;
+       cmd.parameters.add_channel.gid = gid;
+       cmd.parameters.add_channel.channel_name = channel_name;
+       cmd.parameters.add_channel.key.key = key;
+       cmd.parameters.add_channel.key.domain = domain;
+       cmd.parameters.add_channel.capacity = capacity;
+
+       ret = run_command_wait(handle, &cmd);
+       if (ret) {
+               ret_code = LTTNG_ERR_UNK;
+               goto end;
+       }
+       ret_code = cmd.reply_code;
+end:
+       return ret_code;
+}
+
+enum lttng_error_code notification_thread_command_remove_channel(
+               struct notification_thread_handle *handle,
+               uint64_t key, enum lttng_domain_type domain)
+{
+       int ret;
+       enum lttng_error_code ret_code;
+       struct notification_thread_command cmd;
+
+       init_notification_thread_command(&cmd);
+
+       cmd.type = NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL;
+       cmd.parameters.remove_channel.key = key;
+       cmd.parameters.remove_channel.domain = domain;
+
+       ret = run_command_wait(handle, &cmd);
+       if (ret) {
+               ret_code = LTTNG_ERR_UNK;
+               goto end;
+       }
+       ret_code = cmd.reply_code;
+end:
+       return ret_code;
+}
diff --git a/src/bin/lttng-sessiond/notification-thread-commands.h b/src/bin/lttng-sessiond/notification-thread-commands.h
new file mode 100644 (file)
index 0000000..463aa5e
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef NOTIFICATION_THREAD_COMMANDS_H
+#define NOTIFICATION_THREAD_COMMANDS_H
+
+#include <lttng/domain.h>
+#include <lttng/lttng-error.h>
+#include <urcu/rculfhash.h>
+
+struct notification_thread_data;
+struct lttng_trigger;
+
+enum notification_thread_command_type {
+       NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER,
+       NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER,
+       NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL,
+       NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL,
+};
+
+struct channel_key {
+       uint64_t key;
+       enum lttng_domain_type domain;
+};
+
+struct channel_info {
+       struct channel_key key;
+       char *session_name;
+       uid_t uid;
+       gid_t gid;
+       char *channel_name;
+       uint64_t capacity;
+       struct cds_lfht_node channels_ht_node;
+};
+
+struct notification_thread_command {
+       struct cds_list_head cmd_list_node;
+
+       enum notification_thread_command_type type;
+       union {
+               /* Register/Unregister trigger. */
+               struct lttng_trigger *trigger;
+               /* Add channel. */
+               struct channel_info add_channel;
+               /* Remove channel. */
+               struct {
+                       uint64_t key;
+                       enum lttng_domain_type domain;
+               } remove_channel;
+       } parameters;
+
+       /* Futex on which to wait for command reply (optional). */
+       int32_t reply_futex;
+       enum lttng_error_code reply_code;
+};
+
+enum lttng_error_code notification_thread_command_register_trigger(
+               struct notification_thread_handle *handle,
+               struct lttng_trigger *trigger);
+
+enum lttng_error_code notification_thread_command_unregister_trigger(
+               struct notification_thread_handle *handle,
+               struct lttng_trigger *trigger);
+
+enum lttng_error_code notification_thread_command_add_channel(
+               struct notification_thread_handle *handle,
+               char *session_name, uid_t uid, gid_t gid,
+               char *channel_name, uint64_t key,
+               enum lttng_domain_type domain, uint64_t capacity);
+
+enum lttng_error_code notification_thread_command_remove_channel(
+               struct notification_thread_handle *handle,
+               uint64_t key, enum lttng_domain_type domain);
+
+#endif /* NOTIFICATION_THREAD_COMMANDS_H */
diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c
new file mode 100644 (file)
index 0000000..222e2fa
--- /dev/null
@@ -0,0 +1,1663 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _LGPL_SOURCE
+#include <urcu.h>
+#include <urcu/rculfhash.h>
+
+#include "notification-thread.h"
+#include "notification-thread-events.h"
+#include "notification-thread-commands.h"
+#include <common/error.h>
+#include <common/futex.h>
+#include <common/unix.h>
+#include <common/dynamic-buffer.h>
+#include <common/hashtable/utils.h>
+#include <common/sessiond-comm/sessiond-comm.h>
+#include <lttng/condition/condition.h>
+#include <lttng/action/action.h>
+#include <lttng/notification/notification-internal.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/notification/channel-internal.h>
+#include <time.h>
+#include <unistd.h>
+#include <assert.h>
+#include <inttypes.h>
+
+struct lttng_trigger_list_element {
+       struct lttng_trigger *trigger;
+       struct cds_list_head node;
+};
+
+struct lttng_channel_trigger_list {
+       struct channel_key channel_key;
+       struct cds_list_head list;
+       struct cds_lfht_node channel_triggers_ht_node;
+};
+
+struct lttng_trigger_ht_element {
+       struct lttng_trigger *trigger;
+       struct cds_lfht_node node;
+};
+
+struct lttng_condition_list_element {
+       struct lttng_condition *condition;
+       struct cds_list_head node;
+};
+
+struct notification_client_list_element {
+       struct notification_client *client;
+       struct cds_list_head node;
+};
+
+struct notification_client_list {
+       struct lttng_trigger *trigger;
+       struct cds_list_head list;
+       struct cds_lfht_node notification_trigger_ht_node;
+};
+
+struct notification_client {
+       int socket;
+       uid_t uid;
+       gid_t gid;
+       /*
+        * Conditions to which the client's notification channel is subscribed.
+        * List of struct lttng_condition_list_node. The condition member is
+        * owned by the client.
+        */
+       struct cds_list_head condition_list;
+       struct cds_lfht_node client_socket_ht_node;
+};
+
+struct channel_state_sample {
+       struct channel_key key;
+       struct cds_lfht_node channel_state_ht_node;
+       uint64_t highest_usage;
+       uint64_t lowest_usage;
+};
+
+static
+int match_client(struct cds_lfht_node *node, const void *key)
+{
+       int socket = (int) key;
+       struct notification_client *client;
+
+       client = caa_container_of(node, struct notification_client,
+                       client_socket_ht_node);
+
+       return !!(client->socket == socket);
+}
+
+static
+int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
+{
+       struct channel_key *channel_key = (struct channel_key *) key;
+       struct lttng_channel_trigger_list *trigger_list;
+
+       trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
+                       channel_triggers_ht_node);
+
+       return !!((channel_key->key == trigger_list->channel_key.key) &&
+                       (channel_key->domain == trigger_list->channel_key.domain));
+}
+
+static
+int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
+{
+       struct channel_key *channel_key = (struct channel_key *) key;
+       struct channel_state_sample *sample;
+
+       sample = caa_container_of(node, struct channel_state_sample,
+                       channel_state_ht_node);
+
+       return !!((channel_key->key == sample->key.key) &&
+                       (channel_key->domain == sample->key.domain));
+}
+
+static
+int match_channel_info(struct cds_lfht_node *node, const void *key)
+{
+       struct channel_key *channel_key = (struct channel_key *) key;
+       struct channel_info *channel_info;
+
+       channel_info = caa_container_of(node, struct channel_info,
+                       channels_ht_node);
+
+       return !!((channel_key->key == channel_info->key.key) &&
+                       (channel_key->domain == channel_info->key.domain));
+}
+
+static
+int match_condition(struct cds_lfht_node *node, const void *key)
+{
+       struct lttng_condition *condition_key = (struct lttng_condition *) key;
+       struct lttng_trigger_ht_element *trigger;
+       struct lttng_condition *condition;
+
+       trigger = caa_container_of(node, struct lttng_trigger_ht_element,
+                       node);
+       condition = lttng_trigger_get_condition(trigger->trigger);
+       assert(condition);
+
+       return !!lttng_condition_is_equal(condition_key, condition);
+}
+
+static
+int match_client_list(struct cds_lfht_node *node, const void *key)
+{
+       struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
+       struct notification_client_list *client_list;
+       struct lttng_condition *condition;
+       struct lttng_condition *condition_key = lttng_trigger_get_condition(
+                       trigger_key);
+
+       assert(condition_key);
+
+       client_list = caa_container_of(node, struct notification_client_list,
+                       notification_trigger_ht_node);
+       condition = lttng_trigger_get_condition(client_list->trigger);
+
+       return !!lttng_condition_is_equal(condition_key, condition);
+}
+
+static
+int match_client_list_condition(struct cds_lfht_node *node, const void *key)
+{
+       struct lttng_condition *condition_key = (struct lttng_condition *) key;
+       struct notification_client_list *client_list;
+       struct lttng_condition *condition;
+
+       assert(condition_key);
+
+       client_list = caa_container_of(node, struct notification_client_list,
+                       notification_trigger_ht_node);
+       condition = lttng_trigger_get_condition(client_list->trigger);
+
+       return !!lttng_condition_is_equal(condition_key, condition);
+}
+
+static
+unsigned long lttng_condition_buffer_usage_hash(
+       struct lttng_condition *_condition)
+{
+       unsigned long hash = 0;
+       struct lttng_condition_buffer_usage *condition;
+
+       condition = container_of(_condition,
+                       struct lttng_condition_buffer_usage, parent);
+
+       if (condition->session_name) {
+               hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
+       }
+       if (condition->channel_name) {
+               hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
+       }
+       if (condition->domain.set) {
+               hash ^= hash_key_ulong(
+                               (void *) condition->domain.type,
+                               lttng_ht_seed);
+       }
+       if (condition->threshold_ratio.set) {
+               uint64_t val;
+
+               val = condition->threshold_ratio.value * (double) UINT32_MAX;
+               hash ^= hash_key_u64(&val, lttng_ht_seed);
+       } else if (condition->threshold_ratio.set) {
+               uint64_t val;
+
+               val = condition->threshold_bytes.value;
+               hash ^= hash_key_u64(&val, lttng_ht_seed);
+       }
+       return hash;
+}
+
+/*
+ * The lttng_condition hashing code is kept in this file (rather than
+ * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
+ * don't want to link in liblttng-ctl.
+ */
+static
+unsigned long lttng_condition_hash(struct lttng_condition *condition)
+{
+       switch (condition->type) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+               return lttng_condition_buffer_usage_hash(condition);
+       default:
+               ERR("[notification-thread] Unexpected condition type caught");
+               abort();
+       }
+}
+
+static
+void channel_info_destroy(struct channel_info *channel_info)
+{
+       if (!channel_info) {
+               return;
+       }
+
+       if (channel_info->session_name) {
+               free(channel_info->session_name);
+       }
+       if (channel_info->channel_name) {
+               free(channel_info->channel_name);
+       }
+       free(channel_info);
+}
+
+static
+struct channel_info *channel_info_copy(struct channel_info *channel_info)
+{
+       struct channel_info *copy = zmalloc(sizeof(*channel_info));
+
+       assert(channel_info);
+       assert(channel_info->session_name);
+       assert(channel_info->channel_name);
+
+       if (!copy) {
+               goto end;
+       }
+
+       memcpy(copy, channel_info, sizeof(*channel_info));
+       copy->session_name = NULL;
+       copy->channel_name = NULL;
+
+       copy->session_name = strdup(channel_info->session_name);
+       if (!copy->session_name) {
+               goto error;
+       }
+       copy->channel_name = strdup(channel_info->channel_name);
+       if (!copy->channel_name) {
+               goto error;
+       }
+       cds_lfht_node_init(&channel_info->channels_ht_node);
+end:
+       return copy;
+error:
+       channel_info_destroy(copy);
+       return NULL;
+}
+
+static
+int notification_thread_client_subscribe(struct notification_client *client,
+               struct lttng_condition *condition,
+               struct notification_thread_state *state,
+               enum lttng_notification_channel_status *_status)
+{
+       int ret = 0;
+       struct cds_lfht_iter iter;
+       struct cds_lfht_node *node;
+       struct notification_client_list *client_list;
+       struct lttng_condition_list_element *condition_list_element = NULL;
+       struct notification_client_list_element *client_list_element = NULL;
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+
+       /*
+        * Ensure that the client has not already subscribed to this condition
+        * before.
+        */
+       cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
+               if (lttng_condition_is_equal(condition_list_element->condition,
+                               condition)) {
+                       status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
+                       goto end;
+               }
+       }
+
+       condition_list_element = zmalloc(sizeof(*condition_list_element));
+       if (!condition_list_element) {
+               ret = -1;
+               goto error;
+       }
+       client_list_element = zmalloc(sizeof(*client_list_element));
+       if (!client_list_element) {
+               ret = -1;
+               goto error;
+       }
+
+       rcu_read_lock();
+
+       /*
+        * Add the newly-subscribed condition to the client's subscription list.
+        */
+       CDS_INIT_LIST_HEAD(&condition_list_element->node);
+       condition_list_element->condition = condition;
+       cds_list_add(&condition_list_element->node, &client->condition_list);
+
+       /*
+        * Add the client to the list of clients interested in a given trigger
+        * if a "notification" trigger with a corresponding condition was
+        * added prior.
+        */
+       cds_lfht_lookup(state->notification_trigger_clients_ht,
+                       lttng_condition_hash(condition),
+                       match_client_list_condition,
+                       condition,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       if (!node) {
+               goto end_unlock;
+       }
+
+       client_list = caa_container_of(node, struct notification_client_list,
+                       notification_trigger_ht_node);
+       client_list_element->client = client;
+       CDS_INIT_LIST_HEAD(&client_list_element->node);
+       cds_list_add(&client_list->list, &client_list_element->node);
+end_unlock:
+       rcu_read_unlock();
+end:
+       if (_status) {
+               *_status = status;
+       }
+       return ret;
+error:
+       free(condition_list_element);
+       free(client_list_element);
+       return ret;
+}
+
+static
+int notification_thread_client_unsubscribe(
+               struct notification_client *client,
+               struct lttng_condition *condition,
+               struct notification_thread_state *state,
+               enum lttng_notification_channel_status *_status)
+{
+       struct cds_lfht_iter iter;
+       struct cds_lfht_node *node;
+       struct notification_client_list *client_list;
+       struct lttng_condition_list_element *condition_list_element,
+                       *condition_tmp;
+       struct notification_client_list_element *client_list_element,
+                       *client_tmp;
+       bool condition_found = false;
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+
+       /* Remove the condition from the client's condition list. */
+       cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
+                       &client->condition_list, node) {
+               if (!lttng_condition_is_equal(condition_list_element->condition,
+                               condition)) {
+                       continue;
+               }
+
+               cds_list_del(&condition_list_element->node);
+               /*
+                * The caller may be iterating on the client's conditions to
+                * tear down a client's connection. In this case, the condition
+                * will be destroyed at the end.
+                */
+               if (condition != condition_list_element->condition) {
+                       lttng_condition_destroy(
+                                       condition_list_element->condition);
+               }
+               free(condition_list_element);
+               condition_found = true;
+               break;
+       }
+
+       if (!condition_found) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
+               goto end;
+       }
+
+       /*
+        * Remove the client from the list of clients interested the trigger
+        * matching the condition.
+        */
+       rcu_read_lock();
+       cds_lfht_lookup(state->notification_trigger_clients_ht,
+                       lttng_condition_hash(condition),
+                       match_client_list_condition,
+                       condition,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       if (!node) {
+               goto end_unlock;
+       }
+
+       client_list = caa_container_of(node, struct notification_client_list,
+                       notification_trigger_ht_node);
+       cds_list_for_each_entry_safe(client_list_element, client_tmp,
+                       &client_list->list, node) {
+               if (client_list_element->client->socket != client->socket) {
+                       continue;
+               }
+               cds_list_del(&client_list_element->node);
+               free(client_list_element);
+               break;
+       }
+end_unlock:
+       rcu_read_unlock();
+end:
+       lttng_condition_destroy(condition);
+       if (_status) {
+               *_status = status;
+       }
+       return 0;
+}
+
+static
+void notification_client_destroy(struct notification_client *client,
+               struct notification_thread_state *state)
+{
+       struct lttng_condition_list_element *condition_list_element, *tmp;
+
+       if (!client) {
+               return;
+       }
+
+       /* Release all conditions to which the client was subscribed. */
+       cds_list_for_each_entry_safe(condition_list_element, tmp,
+                       &client->condition_list, node) {
+               (void) notification_thread_client_unsubscribe(client,
+                               condition_list_element->condition, state, NULL);
+       }
+
+       if (client->socket >= 0) {
+               (void) lttcomm_close_unix_sock(client->socket);
+       }
+       free(client);
+}
+
+/*
+ * Call with rcu_read_lock held (and hold for the lifetime of the returned
+ * client pointer).
+ */
+static
+struct notification_client *get_client_from_socket(int socket,
+               struct notification_thread_state *state)
+{
+       struct cds_lfht_iter iter;
+       struct cds_lfht_node *node;
+       struct notification_client *client = NULL;
+
+       cds_lfht_lookup(state->client_socket_ht,
+                       hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
+                       match_client,
+                       (void *) (unsigned long) socket,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       if (!node) {
+               goto end;
+       }
+
+       client = caa_container_of(node, struct notification_client,
+                       client_socket_ht_node);
+end:
+       return client;
+}
+
+static
+bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+               struct channel_info *info)
+{
+       enum lttng_condition_status status;
+       struct lttng_condition *condition;
+       const char *trigger_session_name = NULL;
+       const char *trigger_channel_name = NULL;
+       enum lttng_domain_type trigger_domain;
+
+       condition = lttng_trigger_get_condition(trigger);
+       if (!condition) {
+               goto fail;
+       }
+
+       switch (lttng_condition_get_type(condition)) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+               break;
+       default:
+               goto fail;
+       }
+
+       status = lttng_condition_buffer_usage_get_domain_type(condition,
+                       &trigger_domain);
+       assert(status == LTTNG_CONDITION_STATUS_OK);
+       if (info->key.domain != trigger_domain) {
+               goto fail;
+       }
+
+       status = lttng_condition_buffer_usage_get_session_name(
+                       condition, &trigger_session_name);
+       assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
+
+       status = lttng_condition_buffer_usage_get_channel_name(
+                       condition, &trigger_channel_name);
+       assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
+
+       if (strcmp(info->session_name, trigger_session_name)) {
+               goto fail;
+       }
+       if (strcmp(info->channel_name, trigger_channel_name)) {
+               goto fail;
+       }
+
+       return true;
+fail:
+       return false;
+}
+
+static
+bool trigger_applies_to_client(struct lttng_trigger *trigger,
+               struct notification_client *client)
+{
+       bool applies = false;
+       struct lttng_condition_list_element *condition_list_element;
+
+       cds_list_for_each_entry(condition_list_element, &client->condition_list,
+                       node) {
+               applies = lttng_condition_is_equal(
+                               condition_list_element->condition,
+                               lttng_trigger_get_condition(trigger));
+               if (applies) {
+                       break;
+               }
+       }
+       return applies;
+}
+
+static
+unsigned long hash_channel_key(struct channel_key *key)
+{
+       return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong(
+               (void *) (unsigned long) key->domain, lttng_ht_seed);
+}
+
+static
+int handle_notification_thread_command_add_channel(
+       struct notification_thread_state *state,
+       struct channel_info *channel_info,
+       enum lttng_error_code *cmd_result)
+{
+       struct cds_list_head trigger_list;
+       struct channel_info *new_channel_info;
+       struct channel_key *channel_key;
+       struct lttng_channel_trigger_list *channel_trigger_list = NULL;
+       struct lttng_trigger_ht_element *trigger_ht_element = NULL;
+       int trigger_count = 0;
+       struct cds_lfht_iter iter;
+
+       DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
+                       channel_info->channel_name, channel_info->session_name,
+                       channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
+
+       CDS_INIT_LIST_HEAD(&trigger_list);
+
+       new_channel_info = channel_info_copy(channel_info);
+       if (!new_channel_info) {
+               goto error;
+       }
+
+       channel_key = &new_channel_info->key;
+
+       /* Build a list of all triggers applying to the new channel. */
+       cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
+                       node) {
+               struct lttng_trigger_list_element *new_element;
+
+               if (!trigger_applies_to_channel(trigger_ht_element->trigger,
+                               channel_info)) {
+                       continue;
+               }
+
+               new_element = zmalloc(sizeof(*new_element));
+               if (!new_element) {
+                       goto error;
+               }
+               CDS_INIT_LIST_HEAD(&new_element->node);
+               new_element->trigger = trigger_ht_element->trigger;
+               cds_list_add(&new_element->node, &trigger_list);
+               trigger_count++;
+       }
+
+       DBG("[notification-thread] Found %i triggers that apply to newly added channel",
+                       trigger_count);
+       channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
+       if (!channel_trigger_list) {
+               goto error;
+       }
+       channel_trigger_list->channel_key = *channel_key;
+       CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
+       cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
+       cds_list_splice(&trigger_list, &channel_trigger_list->list);
+
+       rcu_read_lock();
+       /* Add channel to the channel_ht which owns the channel_infos. */
+       cds_lfht_add(state->channels_ht,
+                       hash_channel_key(channel_key),
+                       &new_channel_info->channels_ht_node);
+       /*
+        * Add the list of triggers associated with this channel to the
+        * channel_triggers_ht.
+        */
+       cds_lfht_add(state->channel_triggers_ht,
+                       hash_channel_key(channel_key),
+                       &channel_trigger_list->channel_triggers_ht_node);
+       rcu_read_unlock();
+       *cmd_result = LTTNG_OK;
+       return 0;
+error:
+       /* Empty trigger list */
+       channel_info_destroy(new_channel_info);
+       return 1;
+}
+
+static
+int handle_notification_thread_command_remove_channel(
+       struct notification_thread_state *state,
+       uint64_t channel_key, enum lttng_domain_type domain,
+       enum lttng_error_code *cmd_result)
+{
+       struct cds_lfht_node *node;
+       struct cds_lfht_iter iter;
+       struct lttng_channel_trigger_list *trigger_list;
+       struct lttng_trigger_list_element *trigger_list_element, *tmp;
+       struct channel_key key = { .key = channel_key, .domain = domain };
+       struct channel_info *channel_info;
+
+       DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
+                       channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
+
+       rcu_read_lock();
+
+       cds_lfht_lookup(state->channel_triggers_ht,
+                       hash_channel_key(&key),
+                       match_channel_trigger_list,
+                       &key,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       /*
+        * There is a severe internal error if we are being asked to remove a
+        * channel that doesn't exist.
+        */
+       if (!node) {
+               ERR("[notification-thread] Channel being removed is unknown to the notification thread");
+               goto end;
+       }
+
+       /* Free the list of triggers associated with this channel. */
+       trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
+                       channel_triggers_ht_node);
+       cds_list_for_each_entry_safe(trigger_list_element, tmp,
+                       &trigger_list->list, node) {
+               cds_list_del(&trigger_list_element->node);
+               free(trigger_list_element);
+       }
+       cds_lfht_del(state->channel_triggers_ht, node);
+       free(trigger_list);
+
+       /* Free sampled channel state. */
+       cds_lfht_lookup(state->channel_state_ht,
+                       hash_channel_key(&key),
+                       match_channel_state_sample,
+                       &key,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       /*
+        * This is expected to be NULL if the channel is destroyed before we
+        * received a sample.
+        */
+       if (node) {
+               struct channel_state_sample *sample = caa_container_of(node,
+                               struct channel_state_sample,
+                               channel_state_ht_node);
+
+               cds_lfht_del(state->channel_state_ht, node);
+               free(sample);
+       }
+
+       /* Remove the channel from the channels_ht and free it. */
+       cds_lfht_lookup(state->channels_ht,
+                       hash_channel_key(&key),
+                       match_channel_info,
+                       &key,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       assert(node);
+       channel_info = caa_container_of(node, struct channel_info,
+                       channels_ht_node);
+       cds_lfht_del(state->channels_ht, node);
+       channel_info_destroy(channel_info);
+end:
+       rcu_read_unlock();
+       *cmd_result = LTTNG_OK;
+       return 0;
+}
+
+/*
+ * FIXME A client's credentials are not checked when registering a trigger, nor
+ *       are they stored alongside with the trigger.
+ *
+ * The effects of this are benign:
+ *     - The client will succeed in registering the trigger, as it is valid,
+ *     - The trigger will, internally, be bound to the channel,
+ *     - The notifications will not be sent since the client's credentials
+ *       are checked against the channel at that moment.
+ */
+static
+int handle_notification_thread_command_register_trigger(
+       struct notification_thread_state *state,
+       struct lttng_trigger *trigger,
+       enum lttng_error_code *cmd_result)
+{
+       int ret = 0;
+       struct lttng_condition *condition;
+       struct notification_client *client;
+       struct notification_client_list *client_list = NULL;
+       struct lttng_trigger_ht_element *trigger_ht_element = NULL;
+       struct notification_client_list_element *client_list_element, *tmp;
+       struct cds_lfht_node *node;
+       struct cds_lfht_iter iter;
+       struct channel_info *channel;
+       bool free_trigger = true;
+
+       rcu_read_lock();
+
+       condition = lttng_trigger_get_condition(trigger);
+       trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
+       if (!trigger_ht_element) {
+               ret = -1;
+               goto error;
+       }
+
+       /* Add trigger to the trigger_ht. */
+       cds_lfht_node_init(&trigger_ht_element->node);
+       trigger_ht_element->trigger = trigger;
+
+       node = cds_lfht_add_unique(state->triggers_ht,
+                       lttng_condition_hash(condition),
+                       match_condition,
+                       condition,
+                       &trigger_ht_element->node);
+       if (node != &trigger_ht_element->node) {
+               /* Not a fatal error, simply report it to the client. */
+               *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
+               goto error_free_ht_element;
+       }
+
+       /*
+        * Ownership of the trigger and of its wrapper was transfered to
+        * the triggers_ht.
+        */
+       trigger_ht_element = NULL;
+       free_trigger = false;
+
+       /*
+        * The rest only applies to triggers that have a "notify" action.
+        * It is not skipped as this is the only action type currently
+        * supported.
+        */
+       client_list = zmalloc(sizeof(*client_list));
+       if (!client_list) {
+               ret = -1;
+               goto error_free_ht_element;
+       }
+       cds_lfht_node_init(&client_list->notification_trigger_ht_node);
+       CDS_INIT_LIST_HEAD(&client_list->list);
+       client_list->trigger = trigger;
+
+       /* Build a list of clients to which this new trigger applies. */
+       cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
+                       client_socket_ht_node) {
+               if (!trigger_applies_to_client(trigger, client)) {
+                       continue;
+               }
+
+               client_list_element = zmalloc(sizeof(*client_list_element));
+               if (!client_list_element) {
+                       ret = -1;
+                       goto error_free_client_list;
+               }
+               CDS_INIT_LIST_HEAD(&client_list_element->node);
+               client_list_element->client = client;
+               cds_list_add(&client_list_element->node, &client_list->list);
+       }
+
+       cds_lfht_add(state->notification_trigger_clients_ht,
+                       lttng_condition_hash(condition),
+                       &client_list->notification_trigger_ht_node);
+       /*
+        * Client list ownership transferred to the
+        * notification_trigger_clients_ht.
+        */
+       client_list = NULL;
+
+       /*
+        * Add the trigger to list of triggers bound to the channels currently
+        * known.
+        */
+       cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
+                       channels_ht_node) {
+               struct lttng_trigger_list_element *trigger_list_element;
+               struct lttng_channel_trigger_list *trigger_list;
+
+               if (!trigger_applies_to_channel(trigger, channel)) {
+                       continue;
+               }
+
+               cds_lfht_lookup(state->channel_triggers_ht,
+                               hash_channel_key(&channel->key),
+                               match_channel_trigger_list,
+                               &channel->key,
+                               &iter);
+               node = cds_lfht_iter_get_node(&iter);
+               assert(node);
+               /* Free the list of triggers associated with this channel. */
+               trigger_list = caa_container_of(node,
+                               struct lttng_channel_trigger_list,
+                               channel_triggers_ht_node);
+               
+               trigger_list_element = zmalloc(sizeof(*trigger_list_element));
+               if (!trigger_list_element) {
+                       ret = -1;
+                       goto error_free_client_list;
+               }
+               CDS_INIT_LIST_HEAD(&trigger_list_element->node);
+               trigger_list_element->trigger = trigger;
+               cds_list_add(&trigger_list_element->node, &trigger_list->list);
+               /* A trigger can only apply to one channel. */
+               break;
+       }
+
+       *cmd_result = LTTNG_OK;
+error_free_client_list:
+       if (client_list) {
+               cds_list_for_each_entry_safe(client_list_element, tmp,
+                               &client_list->list, node) {
+                       free(client_list_element);
+               }
+               free(client_list);
+       }
+error_free_ht_element:
+       free(trigger_ht_element);
+error:
+       if (free_trigger) {
+               lttng_trigger_destroy(trigger);
+       }
+       rcu_read_unlock();
+       return ret;
+}
+
+int handle_notification_thread_command_unregister_trigger(
+               struct notification_thread_state *state,
+               struct lttng_trigger *trigger,
+               enum lttng_error_code *_cmd_reply)
+{
+       struct cds_lfht_iter iter;
+       struct cds_lfht_node *node, *triggers_ht_node;
+       struct lttng_channel_trigger_list *trigger_list;
+       struct notification_client_list *client_list;
+       struct notification_client_list_element *client_list_element, *tmp;
+       struct lttng_trigger_ht_element *trigger_ht_element = NULL;
+       struct lttng_condition *condition = lttng_trigger_get_condition(
+                       trigger);
+       enum lttng_error_code cmd_reply;
+
+       rcu_read_lock();
+
+       cds_lfht_lookup(state->triggers_ht,
+                       lttng_condition_hash(condition),
+                       match_condition,
+                       condition,
+                       &iter);
+       triggers_ht_node = cds_lfht_iter_get_node(&iter);
+       if (!triggers_ht_node) {
+               cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
+               goto end;
+       } else {
+               cmd_reply = LTTNG_OK;
+       }
+
+       /* Remove trigger from channel_triggers_ht. */
+       cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
+                       channel_triggers_ht_node) {
+               struct lttng_trigger_list_element *trigger_element, *tmp;
+
+               cds_list_for_each_entry_safe(trigger_element, tmp,
+                               &trigger_list->list, node) {
+                       struct lttng_condition *current_condition =
+                                       lttng_trigger_get_condition(
+                                               trigger_element->trigger);
+
+                       assert(current_condition);
+                       if (!lttng_condition_is_equal(condition,
+                                       current_condition)) {
+                               continue;
+                       }
+
+                       DBG("[notification-thread] Removed trigger from channel_triggers_ht");
+                       cds_list_del(&trigger_element->node);
+               }
+       }
+
+       /*
+        * Remove and release the client list from
+        * notification_trigger_clients_ht.
+        */
+       cds_lfht_lookup(state->notification_trigger_clients_ht,
+                       lttng_condition_hash(condition),
+                       match_client_list,
+                       trigger,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       assert(node);
+       client_list = caa_container_of(node, struct notification_client_list,
+                       notification_trigger_ht_node);
+       cds_list_for_each_entry_safe(client_list_element, tmp,
+                       &client_list->list, node) {
+               free(client_list_element);
+       }
+       cds_lfht_del(state->notification_trigger_clients_ht, node);
+       free(client_list);
+
+       /* Remove trigger from triggers_ht. */
+       trigger_ht_element = caa_container_of(triggers_ht_node,
+                       struct lttng_trigger_ht_element, node);
+       cds_lfht_del(state->triggers_ht, triggers_ht_node);
+       lttng_trigger_destroy(trigger_ht_element->trigger);
+       free(trigger_ht_element);
+end:
+       rcu_read_unlock();
+       if (_cmd_reply) {
+               *_cmd_reply = cmd_reply;
+       }
+       return 0;
+}
+
+int handle_notification_thread_command(
+               struct notification_thread_handle *handle,
+               struct notification_thread_state *state)
+{
+       int ret;
+       uint64_t counter;
+       struct notification_thread_command *cmd;
+
+       /* Read event_fd to put it back into a quiescent state. */
+       ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+       if (ret == -1) {
+               goto error;
+       }
+
+       pthread_mutex_lock(&handle->cmd_queue.lock);
+       cmd = cds_list_first_entry(&handle->cmd_queue.list,
+                       struct notification_thread_command, cmd_list_node);
+       switch (cmd->type) {
+       case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
+               DBG("[notification-thread] Received register trigger command");
+               ret = handle_notification_thread_command_register_trigger(
+                               state, cmd->parameters.trigger,
+                               &cmd->reply_code);
+               break;
+       case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
+               DBG("[notification-thread] Received unregister trigger command");
+               ret = handle_notification_thread_command_unregister_trigger(
+                               state, cmd->parameters.trigger,
+                               &cmd->reply_code);
+               break;
+       case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
+               DBG("[notification-thread] Received add channel command");
+               ret = handle_notification_thread_command_add_channel(
+                               state, &cmd->parameters.add_channel,
+                               &cmd->reply_code);
+               break;
+       case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
+               DBG("[notification-thread] Received remove channel command");
+               ret = handle_notification_thread_command_remove_channel(
+                               state, cmd->parameters.remove_channel.key,
+                               cmd->parameters.remove_channel.domain,
+                               &cmd->reply_code);
+               break;
+       default:
+               ERR("[notification-thread] Unknown internal command received");
+               goto error_unlock;
+       }
+
+       if (ret) {
+               goto error_unlock;
+       }
+
+       cds_list_del(&cmd->cmd_list_node);
+       futex_nto1_wake(&cmd->reply_futex);
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+       return 0;
+error_unlock:
+       /* Wake-up and return a fatal error to the calling thread. */
+       futex_nto1_wake(&cmd->reply_futex);
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+       cmd->reply_code = LTTNG_ERR_FATAL;
+error:
+       /* Indicate a fatal error to the caller. */
+       return 1;
+}
+
+static
+unsigned long hash_client_socket(int socket)
+{
+       return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
+}
+
+int handle_notification_thread_client_connect(
+               struct notification_thread_state *state)
+{
+       int ret;
+       struct notification_client *client;
+
+       DBG("[notification-thread] Handling new notification channel client connection");
+
+       client = zmalloc(sizeof(*client));
+       if (!client) {
+               /* Fatal error. */
+               ret = -1;
+               goto error;
+       }
+       CDS_INIT_LIST_HEAD(&client->condition_list);
+
+       ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to accept new notification channel client connection");
+               ret = 0;
+               goto error;
+       }
+
+       client->socket = ret;
+
+       /* FIXME set client socket as non-blocking. */
+       /*
+       ret = set_socket_non_blocking(client->socket);
+       if (ret) {
+               ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
+               goto error;
+       }
+       */
+
+       /* FIXME handle creds. */
+       ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
+               ret = 0;
+               goto error;
+       }
+
+       /* FIXME protocol version handshake. */
+
+       ret = lttng_poll_add(&state->events, client->socket,
+                       LPOLLIN | LPOLLERR |
+                       LPOLLHUP | LPOLLRDHUP);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to add notification channel client socket to poll set");
+               ret = 0;
+               goto error;
+       }
+       DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
+                       client->socket);
+
+       /* Add to ht. */
+       rcu_read_lock();
+       cds_lfht_add(state->client_socket_ht,
+                       hash_client_socket(client->socket),
+                       &client->client_socket_ht_node);
+       rcu_read_unlock();
+
+       return ret;
+error:
+       notification_client_destroy(client, state);
+       return ret;
+}
+
+int handle_notification_thread_client_disconnect(
+               int client_socket,
+               struct notification_thread_state *state)
+{
+       int ret = 0;
+       struct notification_client *client;
+
+       rcu_read_lock();
+       DBG("[notification-thread] Closing client connection (socket fd = %i)",
+                       client_socket);
+       client = get_client_from_socket(client_socket, state);
+       if (!client) {
+               /* Internal state corruption, fatal error. */
+               ERR("[notification-thread] Unable to find client (socket fd = %i)",
+                               client_socket);
+               ret = -1;
+               goto end;
+       }
+
+       ret = lttng_poll_del(&state->events, client_socket);
+       if (ret) {
+               ERR("[notification-thread] Failed to remove client socket from poll set");
+       }
+        cds_lfht_del(state->client_socket_ht,
+                       &client->client_socket_ht_node);
+       notification_client_destroy(client, state);
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+int handle_notification_thread_client_disconnect_all(
+               struct notification_thread_state *state)
+{
+       struct cds_lfht_iter iter;
+       struct notification_client *client;
+       bool error_encoutered = false;
+
+       rcu_read_lock();
+       DBG("[notification-thread] Closing all client connections");
+       cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
+               client_socket_ht_node) {
+               int ret;
+
+               ret = handle_notification_thread_client_disconnect(
+                               client->socket, state);
+               if (ret) {
+                       error_encoutered = true;
+               }
+       }
+       rcu_read_unlock();
+       return error_encoutered ? 1 : 0;
+}
+
+int handle_notification_thread_trigger_unregister_all(
+               struct notification_thread_state *state)
+{
+       bool error_occured = false;
+       struct cds_lfht_iter iter;
+       struct lttng_trigger_ht_element *trigger_ht_element;
+
+       cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
+                       node) {
+               int ret = handle_notification_thread_command_unregister_trigger(
+                               state, trigger_ht_element->trigger, NULL);
+               if (ret) {
+                       error_occured = true;
+               }
+       }
+       return error_occured ? -1 : 0;
+}
+
+static
+int send_client_reply(int socket,
+               enum lttng_notification_channel_status status)
+{
+       ssize_t ret;
+       struct lttng_notification_channel_command_reply reply = {
+               .status = (int8_t) status,
+       };
+       struct lttng_notification_channel_message msg = {
+               .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
+               .size = sizeof(reply),
+       };
+       char *buffer[sizeof(msg) + sizeof(reply)] = {};
+
+       memcpy(buffer, &msg, sizeof(msg));
+       memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
+       DBG("[notification-thread] Send command reply (%i)", (int) status);
+
+       ret = lttcomm_send_unix_sock(socket, buffer,
+                       sizeof(msg) + sizeof(reply));
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to send command reply");
+               goto error;
+       }
+       return 0;
+error:
+       return -1;
+}
+
+int handle_notification_thread_client(struct notification_thread_state *state,
+               int socket)
+{
+       int ret = 0;
+       size_t received = 0;
+       struct notification_client *client;
+       struct lttng_notification_channel_message msg;
+       struct lttng_dynamic_buffer buffer;
+       struct lttng_condition *condition = NULL;
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+
+       lttng_dynamic_buffer_init(&buffer);
+
+       client = get_client_from_socket(socket, state);
+       if (!client) {
+               /* Internal error, abort. */
+               ret = 1;
+               goto error_no_reply;
+       }
+
+       /* Receive message header. */
+       do {
+               ssize_t recv_ret;
+
+               recv_ret = lttcomm_recv_unix_sock(socket,
+                               ((char *) &msg) + received,
+                               sizeof(msg) - received);
+               if (recv_ret <= 0) {
+                       ERR("[notification-thread] Failed to receive channel command from client (received %zu bytes)", received);
+                       /*
+                        * Protocol error, disconnect the client but don't
+                        * signal an error.
+                        */
+                       goto error_disconnect_client;
+               }
+               received += recv_ret;
+       } while (received < sizeof(msg));
+
+       ret = lttng_dynamic_buffer_set_size(&buffer, msg.size);
+       if (ret) {
+               goto error_disconnect_client;
+       }
+
+       /* Receive message body. */
+       received = 0;
+       do {
+               ssize_t recv_ret;
+
+               recv_ret = lttcomm_recv_unix_sock(socket,
+                               buffer.data + received,
+                               msg.size - received);
+               if (recv_ret <= 0) {
+                       ERR("[notification-thread] Failed to receive condition from client");
+                       goto error_disconnect_client;
+               }
+               received += recv_ret;
+       } while (received < msg.size);
+
+       ret = lttng_condition_create_from_buffer(buffer.data, &condition);
+       if (ret < 0 || ret < msg.size) {
+               ERR("[notification-thread] Malformed condition received from client");
+               goto error_disconnect_client;
+       }
+
+       DBG("[notification-thread] Successfully received condition from notification channel client");
+
+       switch ((enum lttng_notification_channel_message_type) msg.type) {
+       case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
+               ret = notification_thread_client_subscribe(client, condition,
+                               state, &status);
+               break;
+       case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
+               ret = notification_thread_client_unsubscribe(client, condition,
+                               state, &status);
+               break;
+       default:
+               ERR("[notification-thread] Unknown command type received from notification channel client");
+               goto error_disconnect_client;
+       }
+
+       if (send_client_reply(socket, status)) {
+               ERR("[notification-thread] Failed to send reply to notification channel client");
+               goto error_disconnect_client;
+       }
+
+       lttng_dynamic_buffer_reset(&buffer);
+       ret = 0;
+       return ret;
+
+error_disconnect_client:
+       ret = handle_notification_thread_client_disconnect(socket, state);
+error_no_reply:
+       lttng_dynamic_buffer_reset(&buffer);
+       lttng_condition_destroy(condition);
+       return ret;
+}
+
+static
+bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
+               struct channel_state_sample *sample, uint64_t buffer_capacity)
+{
+       bool result = false;
+       uint64_t threshold;
+       enum lttng_condition_type condition_type;
+       struct lttng_condition_buffer_usage *use_condition = container_of(
+                       condition, struct lttng_condition_buffer_usage,
+                       parent);
+
+       if (!sample) {
+               goto end;
+       }
+
+       if (use_condition->threshold_bytes.set) {
+               threshold = use_condition->threshold_bytes.value;
+       } else {
+               /* Threshold was expressed as a ratio. */
+               threshold = (uint64_t) (use_condition->threshold_ratio.value *
+                               (double) buffer_capacity);
+       }
+
+       condition_type = lttng_condition_get_type(condition);
+       if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
+               /*
+                * The low condition should only be triggered once _all_ of the
+                * streams in a channel have gone below the "low" threshold.
+                */
+               if (sample->highest_usage <= threshold) {
+                       result = true;
+               }
+       } else {
+               /*
+                * For high buffer usage scenarios, we want to trigger whenever
+                * _any_ of the streams has reached the "high" threshold.
+                */
+               if (sample->highest_usage >= threshold) {
+                       result = true;
+               }
+       }
+end:
+       return result;
+}
+
+static
+int evaluate_condition(struct lttng_condition *condition,
+               struct lttng_evaluation **evaluation,
+               struct notification_thread_state *state,
+               struct channel_state_sample *previous_sample,
+               struct channel_state_sample *latest_sample,
+               uint64_t buffer_capacity)
+{
+       int ret = 0;
+       enum lttng_condition_type condition_type;
+       bool previous_sample_result;
+       bool latest_sample_result;
+
+       condition_type = lttng_condition_get_type(condition);
+       /* No other condition type supported for the moment. */
+       assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
+                       condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
+
+       previous_sample_result = evaluate_buffer_usage_condition(condition,
+                       previous_sample, buffer_capacity);
+       latest_sample_result = evaluate_buffer_usage_condition(condition,
+                       latest_sample, buffer_capacity);
+
+       if (!latest_sample_result ||
+                       (previous_sample_result == latest_sample_result)) {
+               /*
+                * Only trigger on a condition evaluation transition.
+                * NOTE: This edge-triggered logic may not be appropriate for
+                * future condition types.
+                */
+               goto end;
+       }
+
+       if (evaluation && latest_sample_result) {
+               *evaluation = lttng_evaluation_buffer_usage_create(
+                               condition_type,
+                               latest_sample->highest_usage,
+                               buffer_capacity);
+               if (!*evaluation) {
+                       ret = -1;
+                       goto end;
+               }
+       }
+end:
+       return ret;
+}
+
+static
+int send_evaluation_to_clients(struct lttng_trigger *trigger,
+               struct lttng_evaluation *evaluation,
+               struct notification_client_list* client_list)
+{
+       int ret = 0;
+       struct lttng_dynamic_buffer msg_buffer;
+       struct notification_client_list_element *client_list_element, *tmp;
+       struct lttng_notification *notification;
+       struct lttng_condition *condition;
+       ssize_t expected_notification_size, notification_size;
+       struct lttng_notification_channel_message msg;
+
+       lttng_dynamic_buffer_init(&msg_buffer);
+
+       condition = lttng_trigger_get_condition(trigger);
+       assert(condition);
+
+       notification = lttng_notification_create(condition, evaluation);
+       if (!notification) {
+               ret = -1;
+               goto end;
+       }
+
+       expected_notification_size = lttng_notification_serialize(notification,
+                       NULL);
+       if (expected_notification_size < 0) {
+               ERR("[notification-thread] Failed to get size of serialized notification");
+               ret = -1;
+               goto end;
+       }
+
+       msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
+       msg.size = (uint32_t) expected_notification_size;
+       ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
+       if (ret) {
+               goto end;
+       }
+
+       ret = lttng_dynamic_buffer_set_size(&msg_buffer,
+                       msg_buffer.size + expected_notification_size);
+       if (ret) {
+               goto end;
+       }
+
+       notification_size = lttng_notification_serialize(notification,
+                       msg_buffer.data + sizeof(msg));
+       if (notification_size != expected_notification_size) {
+               ERR("[notification-thread] Failed to serialize notification");
+               ret = -1;
+               goto end;
+       }
+
+       cds_list_for_each_entry_safe(client_list_element, tmp,
+                       &client_list->list, node) {
+               ret = lttcomm_send_unix_sock(
+                               client_list_element->client->socket,
+                               msg_buffer.data, msg_buffer.size);
+               if (ret < 0) {
+                       ERR("[notification-thread] Failed to send notification to client");
+               }
+       }
+       ret = 0;
+end:
+       lttng_notification_destroy(notification);
+       lttng_dynamic_buffer_reset(&msg_buffer);
+       return ret;
+}
+
+int handle_notification_thread_channel_sample(
+               struct notification_thread_state *state, int pipe,
+               enum lttng_domain_type domain)
+{
+       int ret = 0;
+       struct lttcomm_consumer_channel_monitor_msg sample_msg;
+       struct channel_state_sample previous_sample, latest_sample;
+       struct channel_info *channel_info;
+       struct cds_lfht_node *node;
+       struct cds_lfht_iter iter;
+       struct lttng_channel_trigger_list *trigger_list;
+       struct lttng_trigger_list_element *trigger_list_element;
+       bool previous_sample_available = false;
+
+       /*
+        * The monitoring pipe only holds messages smaller than PIPE_BUF,
+        * ensuring that read/write of sampling messages are atomic.
+        */
+       do {
+               ret = read(pipe, &sample_msg, sizeof(sample_msg));
+       } while (ret == -1 && errno == EINTR);
+       if (ret != sizeof(sample_msg)) {
+               ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
+                               pipe);
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+       latest_sample.key.key = sample_msg.key;
+       latest_sample.key.domain = domain;
+       latest_sample.highest_usage = sample_msg.highest;
+       latest_sample.lowest_usage = sample_msg.lowest;
+
+       DBG("[notification-thread] Handling channel sample (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
+                       latest_sample.highest_usage,
+                       latest_sample.lowest_usage);
+
+       rcu_read_lock();
+
+       /* Retrieve the channel's informations */
+       cds_lfht_lookup(state->channels_ht,
+                       hash_channel_key(&latest_sample.key),
+                       match_channel_info,
+                       &latest_sample.key,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       if (!node) {
+               /*
+                * Not an error since the consumer can push a sample to the pipe
+                * and the rest of the session daemon could notify us of the
+                * channel's destruction before we get a chance to process that
+                * sample.
+                */
+               DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
+                               latest_sample.key.key,
+                               domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
+                                       "user space");
+               goto end_unlock;
+       }
+       channel_info = caa_container_of(node, struct channel_info,
+                       channels_ht_node);
+
+       /* Retrieve the channel's last sample, if it exists, and update it. */
+       cds_lfht_lookup(state->channel_state_ht,
+                       hash_channel_key(&latest_sample.key),
+                       match_channel_state_sample,
+                       &latest_sample.key,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       if (node) {
+               struct channel_state_sample *stored_sample;
+
+               /* Update the sample stored. */
+               stored_sample = caa_container_of(node,
+                               struct channel_state_sample,
+                               channel_state_ht_node);
+               memcpy(&previous_sample, stored_sample,
+                               sizeof(previous_sample));
+               stored_sample->highest_usage = latest_sample.highest_usage;
+               stored_sample->lowest_usage = latest_sample.lowest_usage;
+               previous_sample_available = true;
+       } else {
+               /*
+                * This is the channel's first sample, allocate space for and
+                * store the new sample.
+                */
+               struct channel_state_sample *stored_sample;
+
+               stored_sample = zmalloc(sizeof(*stored_sample));
+               if (!stored_sample) {
+                       ret = -1;
+                       goto end_unlock;
+               }
+
+               memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
+               cds_lfht_node_init(&stored_sample->channel_state_ht_node);
+               cds_lfht_add(state->channel_state_ht,
+                               hash_channel_key(&stored_sample->key),
+                               &stored_sample->channel_state_ht_node);
+       }
+
+       /* Find triggers associated with this channel. */
+       cds_lfht_lookup(state->channel_triggers_ht,
+                       hash_channel_key(&latest_sample.key),
+                       match_channel_trigger_list,
+                       &latest_sample.key,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+       if (!node) {
+               goto end_unlock;
+       }
+
+       trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
+                       channel_triggers_ht_node);
+       cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
+                       node) {
+               struct lttng_condition *condition;
+               struct lttng_action *action;
+               struct lttng_trigger *trigger;
+               struct notification_client_list *client_list;
+               struct lttng_evaluation *evaluation = NULL;
+
+               trigger = trigger_list_element->trigger;
+               condition = lttng_trigger_get_condition(trigger);
+               assert(condition);
+               action = lttng_trigger_get_action(trigger);
+
+               /* Notify actions are the only type currently supported. */
+               assert(lttng_action_get_type(action) ==
+                               LTTNG_ACTION_TYPE_NOTIFY);
+
+               /*
+                * Check if any client is subscribed to the result of this
+                * evaluation.
+                */
+               cds_lfht_lookup(state->notification_trigger_clients_ht,
+                               lttng_condition_hash(condition),
+                               match_client_list,
+                               trigger,
+                               &iter);
+               node = cds_lfht_iter_get_node(&iter);
+               assert(node);
+
+               client_list = caa_container_of(node,
+                               struct notification_client_list,
+                               notification_trigger_ht_node);
+               if (cds_list_empty(&client_list->list)) {
+                       /*
+                        * No clients interested in the evaluation's result,
+                        * skip it.
+                        */
+                       continue;
+               }
+
+               ret = evaluate_condition(condition, &evaluation, state,
+                               previous_sample_available ? &previous_sample : NULL,
+                               &latest_sample, channel_info->capacity);
+               if (ret) {
+                       goto end_unlock;
+               }
+
+               if (!evaluation) {
+                       continue;
+               }
+
+               /* Dispatch evaluation result to all clients. */
+               ret = send_evaluation_to_clients(trigger_list_element->trigger,
+                               evaluation, client_list);
+               if (ret) {
+                       goto end_unlock;
+               }
+       }
+end_unlock:
+       rcu_read_unlock();
+end:
+       return ret;
+}
diff --git a/src/bin/lttng-sessiond/notification-thread-events.h b/src/bin/lttng-sessiond/notification-thread-events.h
new file mode 100644 (file)
index 0000000..941154a
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef NOTIFICATION_THREAD_EVENTS_H
+#define NOTIFICATION_THREAD_EVENTS_H
+
+#include <lttng/domain.h>
+#include "notification-thread.h"
+
+/**
+ * Event handling function shall only return an error if
+ * the thread should be stopped.
+ */
+int handle_notification_thread_command(
+               struct notification_thread_handle *handle,
+               struct notification_thread_state *state);
+
+int handle_notification_thread_client_connect(
+               struct notification_thread_state *state);
+
+int handle_notification_thread_client_disconnect(
+               int client_fd,
+               struct notification_thread_state *state);
+
+int handle_notification_thread_client_disconnect_all(
+               struct notification_thread_state *state);
+
+int handle_notification_thread_trigger_unregister_all(
+               struct notification_thread_state *state);
+
+int handle_notification_thread_client(struct notification_thread_state *state,
+               int socket);
+
+int handle_notification_thread_channel_sample(
+               struct notification_thread_state *state, int pipe,
+               enum lttng_domain_type domain);
+
+#endif /* NOTIFICATION_THREAD_EVENTS_H */
diff --git a/src/bin/lttng-sessiond/notification-thread.c b/src/bin/lttng-sessiond/notification-thread.c
new file mode 100644 (file)
index 0000000..fcf7260
--- /dev/null
@@ -0,0 +1,682 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _LGPL_SOURCE
+#include <lttng/trigger/trigger.h>
+#include <lttng/notification/channel-internal.h>
+#include <lttng/notification/notification-internal.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/buffer-usage-internal.h>
+#include <common/error.h>
+#include <common/config/session-config.h>
+#include <common/defaults.h>
+#include <common/utils.h>
+#include <common/futex.h>
+#include <common/align.h>
+#include <common/time.h>
+#include <sys/eventfd.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <signal.h>
+
+#include "notification-thread.h"
+#include "notification-thread-events.h"
+#include "notification-thread-commands.h"
+#include "lttng-sessiond.h"
+#include "health-sessiond.h"
+
+#include <urcu.h>
+#include <urcu/list.h>
+#include <urcu/rculfhash.h>
+
+/**
+ * This thread maintains an internal state associating clients and triggers.
+ *
+ * In order to speed-up and simplify queries, hash tables providing the
+ * following associations are maintained:
+ *
+ *   - client_socket_ht: associate a client's socket (fd) to its "struct client"
+ *             This hash table owns the "struct client" which must thus be
+ *             disposed-of on removal from the hash table.
+ *
+ *   - channel_triggers_ht:
+ *             associates a channel key to a list of
+ *             struct lttng_trigger_list_nodes. The triggers in this list are
+ *             those that have conditions that apply to this channel.
+ *             This hash table owns the list, but not the triggers themselves.
+ *
+ *   - channel_state_ht:
+ *             associates a pair (channel key, channel domain) to its last
+ *             sampled state received from the consumer daemon
+ *             (struct channel_state).
+ *             This previous sample is kept to implement edge-triggered
+ *             conditions as we need to detect the state transitions.
+ *             This hash table owns the channel state.
+ *
+ *   - notification_trigger_clients_ht:
+ *             associates notification-emitting triggers to clients
+ *             (struct notification_client_ht_node) subscribed to those
+ *             conditions.
+ *             The condition's hash and match functions are used directly since
+ *             all triggers in this hash table have the "notify" action.
+ *             This hash table holds no ownership.
+ *
+ *   - channels_ht:
+ *             associates a channel_key to a struct channel_info. The hash table
+ *             holds the ownership of the struct channel_info.
+ *
+ *   - triggers_ht:
+ *             associated a condition to a struct lttng_trigger_ht_element.
+ *             The hash table holds the ownership of the
+ *             lttng_trigger_ht_elements along with the triggers themselves.
+ *
+ * The thread reacts to the following internal events:
+ *   1) creation of a tracing channel,
+ *   2) destruction of a tracing channel,
+ *   3) registration of a trigger,
+ *   4) unregistration of a trigger,
+ *   5) reception of a channel monitor sample from the consumer daemon.
+ *
+ * Events specific to notification-emitting triggers:
+ *   6) connection of a notification client,
+ *   7) disconnection of a notification client,
+ *   8) subscription of a client to a conditions' notifications,
+ *   9) unsubscription of a client from a conditions' notifications,
+ *
+ *
+ * 1) Creation of a tracing channel
+ *    - notification_trigger_clients_ht is traversed to identify
+ *      triggers which apply to this new channel,
+ *    - triggers identified are added to the channel_triggers_ht.
+ *    - add channel to channels_ht
+ *
+ * 2) Destruction of a tracing channel
+ *    - remove entry from channel_triggers_ht, releasing the list wrapper and
+ *      elements,
+ *    - remove entry from the channel_state_ht.
+ *    - remove channel from channels_ht
+ *
+ * 3) Registration of a trigger
+ *    - if the trigger's action is of type "notify",
+ *      - traverse the list of conditions of every client to build a list of
+ *        clients which have to be notified when this trigger's condition is met,
+ *        - add list of clients (even if it is empty) to the
+ *          notification_trigger_clients_ht,
+ *    - add trigger to channel_triggers_ht (if applicable),
+ *    - add trigger to triggers_ht
+ *
+ * 4) Unregistration of a trigger
+ *    - if the trigger's action is of type "notify",
+ *      - remove the trigger from the notification_trigger_clients_ht,
+ *    - remove trigger from channel_triggers_ht (if applicable),
+ *    - remove trigger from triggers_ht
+ *
+ * 5) Reception of a channel monitor sample from the consumer daemon
+ *    - evaluate the conditions associated with the triggers found in
+ *      the channel_triggers_ht,
+ *      - if a condition evaluates to "true" and the condition is of type
+ *        "notify", query the notification_trigger_clients_ht and send
+ *        a notification to the clients.
+ *
+ * 6) Connection of a client
+ *    - add client socket to the client_socket_ht.
+ *
+ * 7) Disconnection of a client
+ *    - remove client socket from the client_socket_ht,
+ *    - traverse all conditions to which the client is subscribed and remove
+ *      the client from the notification_trigger_clients_ht.
+ *
+ * 8) Subscription of a client to a condition's notifications
+ *    - Add the condition to the client's list of subscribed conditions,
+ *    - Look-up notification_trigger_clients_ht and add the client to
+ *      list of clients.
+ *
+ * 9) Unsubscription of a client to a condition's notifications
+ *    - Remove the condition from the client's list of subscribed conditions,
+ *    - Look-up notification_trigger_clients_ht and remove the client
+ *      from the list of clients.
+ */
+
+/*
+ * Destroy the thread data previously created by the init function.
+ */
+void notification_thread_handle_destroy(
+               struct notification_thread_handle *handle)
+{
+       int ret;
+       struct notification_thread_command *cmd, *tmp;
+
+       if (!handle) {
+               goto end;
+       }
+
+       if (handle->cmd_queue.event_fd < 0) {
+               goto end;
+       }
+       ret = close(handle->cmd_queue.event_fd);
+       if (ret < 0) {
+               PERROR("close notification command queue event_fd");
+       }
+
+       pthread_mutex_lock(&handle->cmd_queue.lock);
+       /* Purge queue of in-flight commands and mark them as cancelled. */
+       cds_list_for_each_entry_safe(cmd, tmp, &handle->cmd_queue.list,
+                       cmd_list_node) {
+               cds_list_del(&cmd->cmd_list_node);
+               cmd->reply_code = LTTNG_ERR_COMMAND_CANCELLED;
+               futex_nto1_wake(&cmd->reply_futex);
+       }
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+       pthread_mutex_destroy(&handle->cmd_queue.lock);
+
+       if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
+               ret = close(handle->channel_monitoring_pipes.ust32_consumer);
+               if (ret) {
+                       PERROR("close 32-bit consumer channel monitoring pipe");
+               }
+       }
+       if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
+               ret = close(handle->channel_monitoring_pipes.ust64_consumer);
+               if (ret) {
+                       PERROR("close 64-bit consumer channel monitoring pipe");
+               }
+       }
+       if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
+               ret = close(handle->channel_monitoring_pipes.kernel_consumer);
+               if (ret) {
+                       PERROR("close kernel consumer channel monitoring pipe");
+               }
+       }
+end:
+       free(handle);
+}
+
+struct notification_thread_handle *notification_thread_handle_create(
+               struct lttng_pipe *ust32_channel_monitor_pipe,
+               struct lttng_pipe *ust64_channel_monitor_pipe,
+               struct lttng_pipe *kernel_channel_monitor_pipe)
+{
+       int ret;
+       struct notification_thread_handle *handle;
+
+       handle = zmalloc(sizeof(*handle));
+       if (!handle) {
+               goto end;
+       }
+
+       /* FIXME Replace eventfd by a pipe to support older kernels. */
+       handle->cmd_queue.event_fd = eventfd(0, EFD_CLOEXEC);
+       if (handle->cmd_queue.event_fd < 0) {
+               PERROR("eventfd notification command queue");
+               goto error;
+       }
+       CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
+       ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL);
+       if (ret) {
+               goto error;
+       }
+
+       if (ust32_channel_monitor_pipe) {
+               handle->channel_monitoring_pipes.ust32_consumer =
+                               lttng_pipe_release_readfd(
+                                       ust32_channel_monitor_pipe);
+               if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
+                       goto error;
+               }
+       } else {
+               handle->channel_monitoring_pipes.ust32_consumer = -1;
+       }
+       if (ust64_channel_monitor_pipe) {
+               handle->channel_monitoring_pipes.ust64_consumer =
+                               lttng_pipe_release_readfd(
+                                       ust64_channel_monitor_pipe);
+               if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
+                       goto error;
+               }
+       } else {
+               handle->channel_monitoring_pipes.ust64_consumer = -1;
+       }
+       if (kernel_channel_monitor_pipe) {
+               handle->channel_monitoring_pipes.kernel_consumer =
+                               lttng_pipe_release_readfd(
+                                       kernel_channel_monitor_pipe);
+               if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
+                       goto error;
+               }
+       } else {
+               handle->channel_monitoring_pipes.kernel_consumer = -1;
+       }
+end:
+       return handle;
+error:
+       notification_thread_handle_destroy(handle);
+       return NULL;
+}
+
+static
+char *get_notification_channel_sock_path(void)
+{
+       int ret;
+       bool is_root = !getuid();
+       char *sock_path;
+
+       sock_path = zmalloc(LTTNG_PATH_MAX);
+       if (!sock_path) {
+               goto error;
+       }
+
+       if (is_root) {
+               ret = snprintf(sock_path, LTTNG_PATH_MAX,
+                               DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
+               if (ret < 0) {
+                       goto error;
+               }
+       } else {
+               char *home_path = utils_get_home_dir();
+
+               if (!home_path) {
+                       ERR("Can't get HOME directory for socket creation");
+                       goto error;
+               }
+
+               ret = snprintf(sock_path, LTTNG_PATH_MAX,
+                               DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
+                               home_path);
+               if (ret < 0) {
+                       goto error;
+               }
+       }
+
+       return sock_path;
+error:
+       free(sock_path);
+       return NULL;
+}
+
+static
+void notification_channel_socket_destroy(int fd)
+{
+       int ret;
+       char *sock_path = get_notification_channel_sock_path();
+
+       DBG("[notification-thread] Destroying notification channel socket");
+
+       if (sock_path) {
+               ret = unlink(sock_path);
+               free(sock_path);
+               if (ret < 0) {
+                       PERROR("unlink notification channel socket");
+               }
+       }
+
+       ret = close(fd);
+       if (ret) {
+               PERROR("close notification channel socket");
+       }
+}
+
+static
+int notification_channel_socket_create(void)
+{
+       int fd = -1, ret;
+       char *sock_path = get_notification_channel_sock_path();
+
+       DBG("[notification-thread] Creating notification channel UNIX socket at %s",
+                       sock_path);
+
+       ret = lttcomm_create_unix_sock(sock_path);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to create notification socket");
+               goto error;
+       }
+       fd = ret;
+
+       ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+       if (ret < 0) {
+               ERR("Set file permissions failed: %s", sock_path);
+               PERROR("chmod notification channel socket");
+               goto error;
+       }
+
+       DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
+                       fd);
+       free(sock_path);
+       return fd;
+error:
+       if (fd >= 0 && close(fd) < 0) {
+               PERROR("close notification channel socket");
+       }
+       free(sock_path);
+       return ret;
+}
+
+static
+int init_poll_set(struct lttng_poll_event *poll_set,
+               struct notification_thread_handle *handle,
+               int notification_channel_socket)
+{
+       int ret;
+
+       /*
+        * Create pollset with size 6:
+        *      - quit pipe,
+        *      - notification channel socket (listen for new connections),
+        *      - command queue event fd (internal sessiond commands),
+        *      - consumerd (32-bit user space) channel monitor pipe,
+        *      - consumerd (64-but user space) channel monitor pipe,
+        *      - consumerd (kernel) channel monitor pipe.
+        */
+       ret = sessiond_set_thread_pollset(poll_set, 6);
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = lttng_poll_add(poll_set, notification_channel_socket,
+                       LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to add notification channel socket to pollset");
+               goto error;
+       }
+       ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd,
+                       LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
+               goto error;
+       }
+       ret = lttng_poll_add(poll_set,
+                       handle->channel_monitoring_pipes.ust32_consumer,
+                       LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
+               goto error;
+       }
+       ret = lttng_poll_add(poll_set,
+                       handle->channel_monitoring_pipes.ust64_consumer,
+                       LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
+               goto error;
+       }
+       if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
+               goto end;
+       }
+       ret = lttng_poll_add(poll_set,
+                       handle->channel_monitoring_pipes.kernel_consumer,
+                       LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
+               goto error;
+       }
+end:
+       return ret;
+error:
+       lttng_poll_clean(poll_set);
+       return ret;
+}
+
+static
+void fini_thread_state(struct notification_thread_state *state)
+{
+       int ret;
+
+       if (state->client_socket_ht) {
+               ret = handle_notification_thread_client_disconnect_all(state);
+               assert(!ret);
+               ret = cds_lfht_destroy(state->client_socket_ht, NULL);
+               assert(!ret);
+       }
+       if (state->triggers_ht) {
+               ret = handle_notification_thread_trigger_unregister_all(state);
+               assert(!ret);
+               ret = cds_lfht_destroy(state->triggers_ht, NULL);
+               assert(!ret);
+       }
+       if (state->channel_triggers_ht) {
+               ret = cds_lfht_destroy(state->channel_triggers_ht, NULL);
+               assert(!ret);
+       }
+       if (state->channel_state_ht) {
+               ret = cds_lfht_destroy(state->channel_state_ht, NULL);
+               assert(!ret);
+       }
+       if (state->notification_trigger_clients_ht) {
+               ret = cds_lfht_destroy(state->notification_trigger_clients_ht,
+                               NULL);
+               assert(!ret);
+       }
+       if (state->channels_ht) {
+               ret = cds_lfht_destroy(state->channels_ht,
+                               NULL);
+               assert(!ret);
+       }
+
+       if (state->notification_channel_socket >= 0) {
+               notification_channel_socket_destroy(
+                               state->notification_channel_socket);
+       }
+       lttng_poll_clean(&state->events);
+}
+
+static
+int init_thread_state(struct notification_thread_handle *handle,
+               struct notification_thread_state *state)
+{
+       int ret;
+
+       memset(state, 0, sizeof(*state));
+       state->notification_channel_socket = -1;
+       lttng_poll_init(&state->events);
+
+       ret = notification_channel_socket_create();
+       if (ret < 0) {
+               goto end;
+       }
+       state->notification_channel_socket = ret;
+
+       ret = init_poll_set(&state->events, handle,
+                       state->notification_channel_socket);
+       if (ret) {
+               goto end;
+       }
+
+       DBG("[notification-thread] Listening on notification channel socket");
+       ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
+       if (ret < 0) {
+               ERR("[notification-thread] Listen failed on notification channel socket");
+               goto error;
+       }
+
+       state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
+                       CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+       if (!state->client_socket_ht) {
+               goto error;
+       }
+
+       state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
+                       CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+       if (!state->channel_triggers_ht) {
+               goto error;
+       }
+
+       state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
+                       CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+       if (!state->channel_state_ht) {
+               goto error;
+       }
+
+       state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE,
+                       1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+       if (!state->notification_trigger_clients_ht) {
+               goto error;
+       }
+
+       state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE,
+                       1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+       if (!state->channels_ht) {
+               goto error;
+       }
+
+       state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
+                       1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+       if (!state->triggers_ht) {
+               goto error;
+       }
+end:
+       return 0;
+error:
+       fini_thread_state(state);
+       return -1;
+}
+
+/*
+ * This thread services notification channel clients and received notifications
+ * from various lttng-sessiond components over a command queue.
+ */
+void *thread_notification(void *data)
+{
+       int ret;
+       struct notification_thread_handle *handle = data;
+       struct notification_thread_state state;
+
+       DBG("[notification-thread] Started notification thread");
+
+       if (!handle) {
+               ERR("[notification-thread] Invalid thread context provided");
+               goto end;
+       }
+
+       rcu_register_thread();
+       rcu_thread_online();
+
+       health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
+       health_code_update();
+
+       ret = init_thread_state(handle, &state);
+       if (ret) {
+               goto end;
+       }
+
+       /* Ready to handle client connections. */
+       sessiond_notify_ready();
+
+       while (true) {
+               int fd_count, i;
+
+               health_poll_entry();
+               DBG("[notification-thread] Entering poll wait");
+               ret = lttng_poll_wait(&state.events, -1);
+               DBG("[notification-thread] Poll wait returned (%i)", ret);
+               health_poll_exit();
+               if (ret < 0) {
+                       /*
+                        * Restart interrupted system call.
+                        */
+                       if (errno == EINTR) {
+                               continue;
+                       }
+                       ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
+                       goto error;
+               }
+
+               fd_count = ret;
+               for (i = 0; i < fd_count; i++) {
+                       int fd = LTTNG_POLL_GETFD(&state.events, i);
+                       uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
+
+                       DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
+
+                       /* Thread quit pipe has been closed. Killing thread. */
+                       if (sessiond_check_thread_quit_pipe(fd, revents)) {
+                               DBG("[notification-thread] Quit pipe signaled, exiting.");
+                               goto exit;
+                       }
+
+                       if (fd == state.notification_channel_socket) {
+                               if (revents & LPOLLIN) {
+                                       ret = handle_notification_thread_client_connect(
+                                                       &state);
+                                       if (ret < 0) {
+                                               goto error;
+                                       }
+                               } else if (revents &
+                                               (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("[notification-thread] Notification socket poll error");
+                                       goto error;
+                               } else {
+                                       ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
+                                       goto error;
+                               }
+                       } else if (fd == handle->cmd_queue.event_fd) {
+                               ret = handle_notification_thread_command(handle,
+                                               &state);
+                               if (ret) {
+                                       goto error;
+                               }
+                       } else if (fd == handle->channel_monitoring_pipes.ust32_consumer) {
+                               ret = handle_notification_thread_channel_sample(
+                                               &state, fd, LTTNG_DOMAIN_UST);
+                               if (ret) {
+                                       goto error;
+                               }
+                       } else if (fd == handle->channel_monitoring_pipes.ust64_consumer) {
+                               ret = handle_notification_thread_channel_sample(
+                                               &state, fd, LTTNG_DOMAIN_UST);
+                               if (ret) {
+                                       goto error;
+                               }
+                       } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
+                               ret = handle_notification_thread_channel_sample(
+                                               &state, fd, LTTNG_DOMAIN_KERNEL);
+                               if (ret) {
+                                       goto error;
+                               }
+                       } else {
+                               /* Activity on a client's socket. */
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       /*
+                                        * It doesn't matter if a command was
+                                        * pending on the client socket at this
+                                        * point since it now has now way to
+                                        * receive the notifications to which
+                                        * it was subscribing or unsubscribing.
+                                        */
+                                       ret = handle_notification_thread_client_disconnect(
+                                                       fd, &state);
+                                       if (ret) {
+                                               goto error;
+                                       }
+                               } else if (revents & LPOLLIN) {
+                                       ret = handle_notification_thread_client(
+                                                       &state, fd);
+                                       if (ret) {
+                                               goto error;
+                                       }
+                               } else {
+                                       DBG("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
+                               }
+                       }
+               }
+       }
+exit:
+error:
+       fini_thread_state(&state);
+       health_unregister(health_sessiond);
+       rcu_thread_offline();
+       rcu_unregister_thread();
+end:
+       return NULL;
+}
diff --git a/src/bin/lttng-sessiond/notification-thread.h b/src/bin/lttng-sessiond/notification-thread.h
new file mode 100644 (file)
index 0000000..c401b41
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef NOTIFICATION_THREAD_H
+#define NOTIFICATION_THREAD_H
+
+#include <urcu/list.h>
+#include <urcu.h>
+#include <urcu/rculfhash.h>
+#include <lttng/trigger/trigger.h>
+#include <common/pipe.h>
+#include <common/compat/poll.h>
+#include <common/hashtable/hashtable.h>
+#include <pthread.h>
+
+struct notification_thread_handle {
+       /*
+        * Queue of struct notification command.
+        * event_fd must be WRITE(2) to signal that a new command
+        * has been enqueued.
+        */
+       struct {
+               int event_fd;
+               struct cds_list_head list;
+               pthread_mutex_t lock;
+       } cmd_queue;
+       /*
+        * Read side of pipes used to receive channel status info collected
+        * by the various consumer daemons.
+        */
+       struct {
+               int ust32_consumer;
+               int ust64_consumer;
+               int kernel_consumer;
+       } channel_monitoring_pipes;
+};
+
+struct notification_thread_state {
+       int notification_channel_socket;
+       struct lttng_poll_event events;
+       struct cds_lfht *client_socket_ht;
+       struct cds_lfht *channel_triggers_ht;
+       struct cds_lfht *channel_state_ht;
+       struct cds_lfht *notification_trigger_clients_ht;
+       struct cds_lfht *channels_ht;
+       struct cds_lfht *triggers_ht;
+};
+
+/* notification_thread_data takes ownership of the channel monitor pipes. */
+struct notification_thread_handle *notification_thread_handle_create(
+               struct lttng_pipe *ust32_channel_monitor_pipe,
+               struct lttng_pipe *ust64_channel_monitor_pipe,
+               struct lttng_pipe *kernel_channel_monitor_pipe);
+void notification_thread_handle_destroy(
+               struct notification_thread_handle *handle);
+
+void *thread_notification(void *data);
+
+#endif /* NOTIFICATION_THREAD_H */
index 5a41c3800709ff3c2a628758ed89f01636be04f7..eb4a76a2ff77a3bc58eb4c26bb39564b244d38c5 100644 (file)
@@ -41,6 +41,8 @@
 #include "ust-ctl.h"
 #include "utils.h"
 #include "session.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-commands.h"
 
 static
 int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess);
@@ -482,7 +484,8 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
                /* Wipe and free registry from session registry. */
                registry = get_session_registry(ua_chan->session);
                if (registry) {
-                       ust_registry_channel_del_free(registry, ua_chan->key);
+                       ust_registry_channel_del_free(registry, ua_chan->key,
+                               true);
                }
                save_per_pid_lost_discarded_counters(ua_chan);
        }
@@ -2839,6 +2842,7 @@ static int create_channel_per_uid(struct ust_app *app,
        int ret;
        struct buffer_reg_uid *reg_uid;
        struct buffer_reg_channel *reg_chan;
+       bool created = false;
 
        assert(app);
        assert(usess);
@@ -2882,7 +2886,7 @@ static int create_channel_per_uid(struct ust_app *app,
                         * it's not visible anymore in the session registry.
                         */
                        ust_registry_channel_del_free(reg_uid->registry->reg.ust,
-                                       ua_chan->tracing_channel_id);
+                                       ua_chan->tracing_channel_id, false);
                        buffer_reg_channel_remove(reg_uid->registry, reg_chan);
                        buffer_reg_channel_destroy(reg_chan, LTTNG_DOMAIN_UST);
                        goto error;
@@ -2898,7 +2902,7 @@ static int create_channel_per_uid(struct ust_app *app,
                                ua_chan->name);
                        goto error;
                }
-
+               created = true;
        }
 
        /* Send buffers to the application. */
@@ -2910,6 +2914,39 @@ static int create_channel_per_uid(struct ust_app *app,
                goto error;
        }
 
+       if (created) {
+               enum lttng_error_code cmd_ret;
+               struct ltt_session *session;
+               uint64_t chan_reg_key;
+               struct ust_registry_channel *chan_reg;
+
+               chan_reg_key = ua_chan->tracing_channel_id;
+               pthread_mutex_lock(&reg_uid->registry->reg.ust->lock);
+               rcu_read_lock();
+               chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust, chan_reg_key);
+               assert(chan_reg);
+               chan_reg->consumer_key = ua_chan->key;
+               rcu_read_unlock();
+               chan_reg = NULL;
+               pthread_mutex_unlock(&reg_uid->registry->reg.ust->lock);
+
+               session = session_find_by_id(ua_sess->tracing_id);
+               assert(session);
+
+               cmd_ret = notification_thread_command_add_channel(
+                               notification_thread_handle, session->name,
+                               ua_sess->euid, ua_sess->egid,
+                               ua_chan->name,
+                               ua_chan->key,
+                               LTTNG_DOMAIN_UST,
+                               ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+               if (cmd_ret != LTTNG_OK) {
+                       ret = - (int) cmd_ret;
+                       ERR("Failed to add channel to notification thread");
+                       goto error;
+               }
+       }
+
 error:
        return ret;
 }
@@ -2925,6 +2962,8 @@ static int create_channel_per_pid(struct ust_app *app,
 {
        int ret;
        struct ust_registry_session *registry;
+       enum lttng_error_code cmd_ret;
+       struct ltt_session *session;
 
        assert(app);
        assert(usess);
@@ -2963,6 +3002,32 @@ static int create_channel_per_pid(struct ust_app *app,
                goto error;
        }
 
+       session = session_find_by_id(ua_sess->tracing_id);
+       assert(session);
+
+       uint64_t chan_reg_key;
+       struct ust_registry_channel *chan_reg;
+
+       chan_reg_key = ua_chan->key;
+       pthread_mutex_lock(&registry->lock);
+       chan_reg = ust_registry_channel_find(registry, chan_reg_key);
+       assert(chan_reg);
+       chan_reg->consumer_key = ua_chan->key;
+       pthread_mutex_unlock(&registry->lock);
+
+       cmd_ret = notification_thread_command_add_channel(
+                       notification_thread_handle, session->name,
+                       ua_sess->euid, ua_sess->egid,
+                       ua_chan->name,
+                       ua_chan->key,
+                       LTTNG_DOMAIN_UST,
+                       ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+       if (cmd_ret != LTTNG_OK) {
+               ret = - (int) cmd_ret;
+               ERR("Failed to add channel to notification thread");
+               goto error;
+       }
+
 error:
        rcu_read_unlock();
        return ret;
@@ -3075,7 +3140,6 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess,
 
        /* Only add the channel if successful on the tracer side. */
        lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node);
-
 end:
        if (ua_chanp) {
                *ua_chanp = ua_chan;
index 3bb54f03983178f035fdcb345f8e1d7670557847..d60ddb9ed0fffe677f331122dca9577985c6d947 100644 (file)
@@ -32,6 +32,8 @@
 #include "ust-consumer.h"
 #include "buffer-registry.h"
 #include "session.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-commands.h"
 
 /*
  * Return allocated full pathname of the session using the consumer trace path
@@ -174,6 +176,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
                        ua_chan->attr.switch_timer_interval,
                        ua_chan->attr.read_timer_interval,
                        ua_sess->live_timer_interval,
+                       1000000,  /* FIXME Use value provided by the client. */
                        output,
                        (int) ua_chan->attr.type,
                        ua_sess->tracing_id,
@@ -223,6 +226,8 @@ error:
 /*
  * Ask consumer to create a channel for a given session.
  *
+ * Session list and rcu read side locks must be held by the caller.
+ *
  * Returns 0 on success else a negative value.
  */
 int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
@@ -230,6 +235,7 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
                struct consumer_socket *socket, struct ust_registry_session *registry)
 {
        int ret;
+       struct ltt_session *session;
 
        assert(ua_sess);
        assert(ua_chan);
@@ -243,10 +249,14 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
                goto error;
        }
 
+       session = session_find_by_id(ua_sess->tracing_id);
+       assert(session);
+
        pthread_mutex_lock(socket->lock);
        ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
        pthread_mutex_unlock(socket->lock);
        if (ret < 0) {
+               ERR("ask_channel_creation consumer command failed");
                goto error;
        }
 
index 3c3aa91f02ce1b0a4719bdc982af80e22bc3fd8c..88d66317001da76386794acaaef45ed62678b819 100644 (file)
@@ -26,6 +26,8 @@
 #include "ust-registry.h"
 #include "ust-app.h"
 #include "utils.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-commands.h"
 
 /*
  * Hash table match function for event in the registry.
@@ -695,13 +697,23 @@ void destroy_channel_rcu(struct rcu_head *head)
  * free the registry pointer since it might not have been allocated before so
  * it's the caller responsability.
  */
-static void destroy_channel(struct ust_registry_channel *chan)
+static void destroy_channel(struct ust_registry_channel *chan, bool notif)
 {
        struct lttng_ht_iter iter;
        struct ust_registry_event *event;
+       enum lttng_error_code cmd_ret;
 
        assert(chan);
 
+       if (notif) {
+               cmd_ret = notification_thread_command_remove_channel(
+                               notification_thread_handle, chan->consumer_key,
+                               LTTNG_DOMAIN_UST);
+               if (cmd_ret != LTTNG_OK) {
+                       ERR("Failed to remove channel from notification thread");
+               }
+       }
+
        rcu_read_lock();
        /* Destroy all event associated with this registry. */
        cds_lfht_for_each_entry(chan->ht->ht, &iter.iter, event, node.node) {
@@ -759,7 +771,7 @@ int ust_registry_channel_add(struct ust_registry_session *session,
        return 0;
 
 error:
-       destroy_channel(chan);
+       destroy_channel(chan, false);
 error_alloc:
        return ret;
 }
@@ -798,7 +810,7 @@ end:
  * Remove channel using key from registry and free memory.
  */
 void ust_registry_channel_del_free(struct ust_registry_session *session,
-               uint64_t key)
+               uint64_t key, bool notif)
 {
        struct lttng_ht_iter iter;
        struct ust_registry_channel *chan;
@@ -817,7 +829,7 @@ void ust_registry_channel_del_free(struct ust_registry_session *session,
        ret = lttng_ht_del(session->channels, &iter);
        assert(!ret);
        rcu_read_unlock();
-       destroy_channel(chan);
+       destroy_channel(chan, notif);
 
 end:
        return;
@@ -972,7 +984,7 @@ void ust_registry_session_destroy(struct ust_registry_session *reg)
                        /* Delete the node from the ht and free it. */
                        ret = lttng_ht_del(reg->channels, &iter);
                        assert(!ret);
-                       destroy_channel(chan);
+                       destroy_channel(chan, true);
                }
                rcu_read_unlock();
                ht_cleanup_push(reg->channels);
index 2697cf2ecc7e85bad6579a0ca8c1757a7f8d37aa..47b21dfe05286a001fd286d67d95817fbec4f57d 100644 (file)
@@ -109,6 +109,7 @@ struct ust_registry_session {
 
 struct ust_registry_channel {
        uint64_t key;
+       uint64_t consumer_key;
        /* Id set when replying to a register channel. */
        uint32_t chan_id;
        enum ustctl_channel_header header_type;
@@ -248,7 +249,7 @@ struct ust_registry_channel *ust_registry_channel_find(
 int ust_registry_channel_add(struct ust_registry_session *session,
                uint64_t key);
 void ust_registry_channel_del_free(struct ust_registry_session *session,
-               uint64_t key);
+               uint64_t key, bool notif);
 
 int ust_registry_session_init(struct ust_registry_session **sessionp,
                struct ust_app *app,
index 8619b5e25f31b627db462a128286ceb0a7872bd9..f6df9a61a4988b78b4ee801f4d7ca02abf96cb0b 100644 (file)
@@ -73,7 +73,10 @@ libcommon_la_SOURCES = error.h error.c utils.c utils.h runas.c runas.h \
                        mi-lttng.h mi-lttng.c \
                        daemonize.c daemonize.h \
                        unix.c unix.h \
-                       filter.c filter.h context.c context.h
+                       filter.c filter.h context.c context.h \
+                       action.c notify.c condition.c buffer-usage.c \
+                       evaluation.c notification.c trigger.c endpoint.c \
+                       dynamic-buffer.h dynamic-buffer.c
 
 libcommon_la_LIBADD = \
                $(top_builddir)/src/common/config/libconfig.la
diff --git a/src/common/action.c b/src/common/action.c
new file mode 100644 (file)
index 0000000..d07f36c
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/action/action-internal.h>
+#include <lttng/action/notify-internal.h>
+#include <common/error.h>
+#include <assert.h>
+
+enum lttng_action_type lttng_action_get_type(struct lttng_action *action)
+{
+       return action ? action->type : LTTNG_ACTION_TYPE_UNKNOWN;
+}
+
+void lttng_action_destroy(struct lttng_action *action)
+{
+       if (!action) {
+               return;
+       }
+
+       assert(action->destroy);
+       action->destroy(action);
+}
+
+LTTNG_HIDDEN
+bool lttng_action_validate(struct lttng_action *action)
+{
+       bool valid;
+
+       if (!action) {
+               valid = false;
+               goto end;
+       }
+
+       if (!action->validate) {
+               /* Sub-class guarantees that it can never be invalid. */
+               valid = true;
+               goto end;
+       }
+
+       valid = action->validate(action);
+end:
+       return valid;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_action_serialize(struct lttng_action *action, char *buf)
+{
+       ssize_t ret, action_size;
+       struct lttng_action_comm action_comm;
+
+       if (!action) {
+               ret = -1;
+               goto end;
+       }
+
+       action_comm.action_type = (int8_t) action->type;
+       ret = sizeof(struct lttng_action_comm);
+       if (buf) {
+               memcpy(buf, &action_comm, ret);
+               buf += ret;
+       }
+
+       action_size = action->serialize(action, buf);
+       if (action_size < 0) {
+               ret = action_size;
+               goto end;
+       }
+       ret += action_size;
+end:
+       return ret;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_action_create_from_buffer(const char *buf,
+               struct lttng_action **_action)
+{
+       ssize_t ret, action_size = sizeof(struct lttng_action_comm);
+       struct lttng_action *action;
+       struct lttng_action_comm *action_comm =
+                       (struct lttng_action_comm *) buf;
+
+       if (!buf || !_action) {
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Deserializing action from buffer");
+       switch (action_comm->action_type) {
+       case LTTNG_ACTION_TYPE_NOTIFY:
+               action = lttng_action_notify_create();
+               break;
+       default:
+               ret = -1;
+               goto end;
+       }
+
+       if (!action) {
+               ret = -1;
+               goto end;
+       }
+       ret = action_size;
+       *_action = action;
+end:
+       return ret;
+}
diff --git a/src/common/buffer-usage.c b/src/common/buffer-usage.c
new file mode 100644 (file)
index 0000000..91db15b
--- /dev/null
@@ -0,0 +1,784 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/buffer-usage-internal.h>
+#include <common/macros.h>
+#include <common/error.h>
+#include <assert.h>
+#include <math.h>
+#include <float.h>
+#include <time.h>
+
+static
+double fixed_to_double(uint32_t val)
+{
+       return (double) val / (double) UINT32_MAX;
+}
+
+static
+uint64_t double_to_fixed(double val)
+{
+       return (val * (double) UINT32_MAX);
+}
+
+static
+bool is_usage_condition(struct lttng_condition *condition)
+{
+       enum lttng_condition_type type = lttng_condition_get_type(condition);
+
+       return type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
+                       type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH;
+}
+
+static
+bool is_usage_evaluation(struct lttng_evaluation *evaluation)
+{
+       enum lttng_condition_type type = lttng_evaluation_get_type(evaluation);
+
+       return type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
+                       type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH;
+}
+
+static
+void lttng_condition_buffer_usage_destroy(struct lttng_condition *condition)
+{
+       struct lttng_condition_buffer_usage *usage;
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+
+       free(usage->session_name);
+       free(usage->channel_name);
+       free(usage);
+}
+
+static
+bool lttng_condition_buffer_usage_validate(struct lttng_condition *condition)
+{
+       bool valid = false;
+       struct lttng_condition_buffer_usage *usage;
+
+       if (!condition) {
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       if (!usage->session_name) {
+               ERR("Invalid buffer condition: a target session name must be set.");
+               goto end;
+       }
+       if (!usage->channel_name) {
+               ERR("Invalid buffer condition: a target channel name must be set.");
+               goto end;
+       }
+       if (!usage->threshold_ratio.set && !usage->threshold_bytes.set) {
+               ERR("Invalid buffer condition: a threshold must be set.");
+               goto end;
+       }
+
+       valid = true;
+end:
+       return valid;
+}
+
+static
+ssize_t lttng_condition_buffer_usage_serialize(struct lttng_condition *condition,
+               char *buf)
+{
+       struct lttng_condition_buffer_usage *usage;
+       ssize_t ret, size;
+       size_t session_name_len, channel_name_len;
+
+       if (!condition || !is_usage_condition(condition)) {
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Serializing buffer usage condition");
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       size = sizeof(struct lttng_condition_buffer_usage_comm);
+       session_name_len = strlen(usage->session_name) + 1;
+       channel_name_len = strlen(usage->channel_name) + 1;
+       size += session_name_len + channel_name_len;
+       if (buf) {
+               struct lttng_condition_buffer_usage_comm usage_comm = {
+                       .threshold_set_in_bytes = usage->threshold_bytes.set ? 1 : 0,
+                       .session_name_len = session_name_len,
+                       .channel_name_len = channel_name_len,
+                       .domain_type = (int8_t) usage->domain.type,
+               };
+
+               if (usage->threshold_bytes.set) {
+                       usage_comm.threshold = usage->threshold_bytes.value;
+               } else {
+                       uint64_t val = double_to_fixed(
+                                       usage->threshold_ratio.value);
+
+                       if (val > UINT32_MAX) {
+                               /* overflow. */
+                               ret = -1;
+                               goto end;
+                       }
+                       usage_comm.threshold = val;
+               }
+
+               memcpy(buf, &usage_comm, sizeof(usage_comm));
+               buf += sizeof(usage_comm);
+               memcpy(buf, usage->session_name, session_name_len);
+               buf += session_name_len;
+               memcpy(buf, usage->channel_name, channel_name_len);
+               buf += channel_name_len;
+       }
+       ret = size;
+end:
+       return ret;
+}
+
+static
+bool lttng_condition_buffer_usage_is_equal(struct lttng_condition *_a,
+               struct lttng_condition *_b)
+{
+       bool is_equal = false;
+       struct lttng_condition_buffer_usage *a, *b;
+
+       a = container_of(_a, struct lttng_condition_buffer_usage, parent);
+       b = container_of(_b, struct lttng_condition_buffer_usage, parent);
+
+       if ((a->threshold_ratio.set && !b->threshold_ratio.set) ||
+                       (a->threshold_bytes.set && !b->threshold_bytes.set)) {
+               goto end;
+       }
+
+       if (a->threshold_ratio.set && b->threshold_ratio.set) {
+               double a_value, b_value, diff;
+
+               a_value = a->threshold_ratio.value;
+               b_value = b->threshold_ratio.value;
+               diff = fabs(a_value - b_value);
+
+               if (diff > DBL_EPSILON) {
+                       goto end;
+               }
+       } else if (a->threshold_bytes.set && b->threshold_bytes.set) {
+               uint64_t a_value, b_value;
+
+               a_value = a->threshold_bytes.value;
+               b_value = b->threshold_bytes.value;
+               if (a_value != b_value) {
+                       goto end;
+               }
+       }
+
+       if ((a->session_name && !b->session_name) ||
+                       (!a->session_name && b->session_name)) {
+               goto end;
+       }
+
+       if (a->channel_name && b->channel_name) {
+               if (strcmp(a->channel_name, b->channel_name)) {
+                       goto end;
+               }
+       }       if ((a->channel_name && !b->channel_name) ||
+                       (!a->channel_name && b->channel_name)) {
+               goto end;
+       }
+
+       if (a->channel_name && b->channel_name) {
+               if (strcmp(a->channel_name, b->channel_name)) {
+                       goto end;
+               }
+       }
+
+       if ((a->domain.set && !b->domain.set) ||
+                       (!a->domain.set && b->domain.set)) {
+               goto end;
+       }
+
+       if (a->domain.set && b->domain.set) {
+               if (a->domain.type != b->domain.type) {
+                       goto end;
+               }
+       }
+       is_equal = true;
+end:
+       return is_equal;
+}
+
+static
+struct lttng_condition *lttng_condition_buffer_usage_create(
+               enum lttng_condition_type type)
+{
+       struct lttng_condition_buffer_usage *condition;
+
+       condition = zmalloc(sizeof(struct lttng_condition_buffer_usage));
+       if (!condition) {
+               goto end;
+       }
+
+       lttng_condition_init(&condition->parent, type);
+       condition->parent.validate = lttng_condition_buffer_usage_validate;
+       condition->parent.serialize = lttng_condition_buffer_usage_serialize;
+       condition->parent.equal = lttng_condition_buffer_usage_is_equal;
+       condition->parent.destroy = lttng_condition_buffer_usage_destroy;
+end:
+       return &condition->parent;
+}
+
+struct lttng_condition *lttng_condition_buffer_usage_low_create(void)
+{
+       return lttng_condition_buffer_usage_create(
+                       LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW);
+}
+
+struct lttng_condition *lttng_condition_buffer_usage_high_create(void)
+{
+       return lttng_condition_buffer_usage_create(
+                       LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
+}
+
+static
+ssize_t init_condition_from_buffer(struct lttng_condition *condition,
+               const char *buf)
+{
+       ssize_t ret, condition_size;
+       enum lttng_condition_status status;
+       enum lttng_domain_type domain_type;
+       struct lttng_condition_buffer_usage_comm *condition_comm =
+                       (struct lttng_condition_buffer_usage_comm *) buf;
+       const char *session_name, *channel_name;
+
+       if (condition_comm->threshold_set_in_bytes) {
+               status = lttng_condition_buffer_usage_set_threshold(condition,
+                               condition_comm->threshold);
+       } else {
+               status = lttng_condition_buffer_usage_set_threshold_ratio(
+                               condition,
+                               fixed_to_double(condition_comm->threshold));
+       }
+       if (status != LTTNG_CONDITION_STATUS_OK) {
+               ERR("Failed to initialize buffer usage condition threshold");
+               ret = -1;
+               goto end;
+       }
+
+       if (condition_comm->domain_type <= LTTNG_DOMAIN_NONE ||
+                       condition_comm->domain_type > LTTNG_DOMAIN_PYTHON) {
+               /* Invalid domain value. */
+               ERR("Invalid domain type value (%i) found in condition buffer",
+                               (int) condition_comm->domain_type);
+               ret = -1;
+               goto end;
+       }
+
+       domain_type = (enum lttng_domain_type) condition_comm->domain_type;
+       status = lttng_condition_buffer_usage_set_domain_type(condition,
+                       domain_type);
+       if (status != LTTNG_CONDITION_STATUS_OK) {
+               ERR("Failed to set buffer usage condition domain");
+               ret = -1;
+               goto end;
+       }
+
+       session_name = buf + sizeof(struct lttng_condition_buffer_usage_comm);
+       channel_name = session_name + condition_comm->session_name_len;
+
+       status = lttng_condition_buffer_usage_set_session_name(condition,
+                       session_name);
+       if (status != LTTNG_CONDITION_STATUS_OK) {
+               ERR("Failed to set buffer usage session name");
+               ret = -1;
+               goto end;
+       }
+
+       status = lttng_condition_buffer_usage_set_channel_name(condition,
+                       channel_name);
+       if (status != LTTNG_CONDITION_STATUS_OK) {
+               ERR("Failed to set buffer usage channel name");
+               ret = -1;
+               goto end;
+       }
+
+       if (!lttng_condition_validate(condition)) {
+               ret = -1;
+               goto end;
+       }
+
+       condition_size = sizeof(*condition_comm);
+       condition_size += (ssize_t) condition_comm->session_name_len;
+       condition_size += (ssize_t) condition_comm->channel_name_len;
+       ret = condition_size;
+end:
+       return ret;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_buffer_usage_low_create_from_buffer(const char *buf,
+               struct lttng_condition **_condition)
+{
+       ssize_t ret;
+       struct lttng_condition *condition =
+                       lttng_condition_buffer_usage_low_create();
+
+       if (!_condition || !condition) {
+               ret = -1;
+               goto error;
+       }
+
+       ret = init_condition_from_buffer(condition, buf);
+       if (ret < 0) {
+               goto error;
+       }
+
+       *_condition = condition;
+       return ret;
+error:
+       lttng_condition_destroy(condition);
+       return ret;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_buffer_usage_high_create_from_buffer(const char *buf,
+               struct lttng_condition **_condition)
+{
+       ssize_t ret;
+       struct lttng_condition *condition =
+                       lttng_condition_buffer_usage_high_create();
+
+       if (!_condition || !condition) {
+               ret = -1;
+               goto error;
+       }
+
+       ret = init_condition_from_buffer(condition, buf);
+       if (ret < 0) {
+               goto error;
+       }
+
+       *_condition = condition;
+       return ret;
+error:
+       lttng_condition_destroy(condition);
+       return ret;
+}
+
+static
+struct lttng_evaluation *create_evaluation_from_buffer(
+               enum lttng_condition_type type, const char *buf)
+{
+       struct lttng_evaluation_buffer_usage_comm *comm =
+                       (struct lttng_evaluation_buffer_usage_comm *) buf;
+       struct lttng_evaluation *evaluation;
+
+       evaluation = lttng_evaluation_buffer_usage_create(type,
+                       comm->buffer_use, comm->buffer_capacity);
+       return evaluation;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_buffer_usage_low_create_from_buffer(const char *buf,
+               struct lttng_evaluation **_evaluation)
+{
+       ssize_t ret;
+       struct lttng_evaluation *evaluation = NULL;
+
+       if (!_evaluation) {
+               ret = -1;
+               goto error;
+       }
+
+       evaluation = create_evaluation_from_buffer(
+                       LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW, buf);
+       if (!evaluation) {
+               ret = -1;
+               goto error;
+       }
+
+       *_evaluation = evaluation;
+       ret = sizeof(struct lttng_evaluation_buffer_usage_comm);
+       return ret;
+error:
+       lttng_evaluation_destroy(evaluation);
+       return ret;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_buffer_usage_high_create_from_buffer(const char *buf,
+               struct lttng_evaluation **_evaluation)
+{
+       ssize_t ret;
+       struct lttng_evaluation *evaluation = NULL;
+
+       if (!_evaluation) {
+               ret = -1;
+               goto error;
+       }
+
+       evaluation = create_evaluation_from_buffer(
+                       LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH, buf);
+       if (!evaluation) {
+               ret = -1;
+               goto error;
+       }
+
+       *_evaluation = evaluation;
+       ret = sizeof(struct lttng_evaluation_buffer_usage_comm);
+       return ret;
+error:
+       lttng_evaluation_destroy(evaluation);
+       return ret;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_get_threshold_ratio(
+               struct lttng_condition *condition, double *threshold_ratio)
+{
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) ||
+                       !threshold_ratio) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       if (!usage->threshold_ratio.set) {
+               status = LTTNG_CONDITION_STATUS_UNSET;
+               goto end;
+       }
+       *threshold_ratio = usage->threshold_ratio.value;
+end:
+       return status;
+}
+
+/* threshold_ratio expressed as [0.0, 1.0]. */
+enum lttng_condition_status
+lttng_condition_buffer_usage_set_threshold_ratio(
+               struct lttng_condition *condition, double threshold_ratio)
+{
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) ||
+                       threshold_ratio < 0.0 ||
+                       threshold_ratio > 1.0) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       usage->threshold_ratio.set = true;
+       usage->threshold_bytes.set = false;
+       usage->threshold_ratio.value = threshold_ratio;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_get_threshold(
+               struct lttng_condition *condition, uint64_t *threshold_bytes)
+{
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) || !threshold_bytes) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       if (!usage->threshold_bytes.set) {
+               status = LTTNG_CONDITION_STATUS_UNSET;
+               goto end;
+       }
+       *threshold_bytes = usage->threshold_bytes.value;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_set_threshold(
+               struct lttng_condition *condition, uint64_t threshold_bytes)
+{
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition)) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       usage->threshold_ratio.set = false;
+       usage->threshold_bytes.set = true;
+       usage->threshold_bytes.value = threshold_bytes;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_get_session_name(
+               struct lttng_condition *condition, const char **session_name)
+{
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) || !session_name) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       if (!usage->session_name) {
+               status = LTTNG_CONDITION_STATUS_UNSET;
+               goto end;
+       }
+       *session_name = usage->session_name;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_set_session_name(
+               struct lttng_condition *condition, const char *session_name)
+{
+       char *session_name_copy;
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) || !session_name ||
+                       strlen(session_name) == 0) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       session_name_copy = strdup(session_name);
+       if (!session_name_copy) {
+               status = LTTNG_CONDITION_STATUS_ERROR;
+               goto end;
+       }
+
+       if (usage->session_name) {
+               free(usage->session_name);
+       }
+       usage->session_name = session_name_copy;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_get_channel_name(
+               struct lttng_condition *condition, const char **channel_name)
+{
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) || !channel_name) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       if (!usage->channel_name) {
+               status = LTTNG_CONDITION_STATUS_UNSET;
+               goto end;
+       }
+       *channel_name = usage->channel_name;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_set_channel_name(
+               struct lttng_condition *condition, const char *channel_name)
+{
+       char *channel_name_copy;
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) || !channel_name ||
+                       strlen(channel_name) == 0) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       channel_name_copy = strdup(channel_name);
+       if (!channel_name_copy) {
+               status = LTTNG_CONDITION_STATUS_ERROR;
+               goto end;
+       }
+
+       if (usage->channel_name) {
+               free(usage->channel_name);
+       }
+       usage->channel_name = channel_name_copy;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_get_domain_type(
+               struct lttng_condition *condition, enum lttng_domain_type *type)
+{
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) || !type) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       if (!usage->domain.set) {
+               status = LTTNG_CONDITION_STATUS_UNSET;
+               goto end;
+       }
+       *type = usage->domain.type;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_buffer_usage_set_domain_type(
+               struct lttng_condition *condition, enum lttng_domain_type type)
+{
+       struct lttng_condition_buffer_usage *usage;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !is_usage_condition(condition) ||
+                       type == LTTNG_DOMAIN_NONE) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(condition, struct lttng_condition_buffer_usage,
+                       parent);
+       usage->domain.set = true;
+       usage->domain.type = type;
+end:
+       return status;
+}
+
+static
+ssize_t lttng_evaluation_buffer_usage_serialize(
+               struct lttng_evaluation *evaluation, char *buf)
+{
+       ssize_t ret;
+       struct lttng_evaluation_buffer_usage *usage;
+
+       usage = container_of(evaluation, struct lttng_evaluation_buffer_usage,
+                       parent);
+       if (buf) {
+               struct lttng_evaluation_buffer_usage_comm comm = {
+                       .buffer_use = usage->buffer_use,
+                       .buffer_capacity = usage->buffer_capacity,
+               };
+
+               memcpy(buf, &comm, sizeof(comm));
+       }
+
+       ret = sizeof(struct lttng_evaluation_buffer_usage_comm);
+       return ret;
+}
+
+static
+void lttng_evaluation_buffer_usage_destroy(
+               struct lttng_evaluation *evaluation)
+{
+       struct lttng_evaluation_buffer_usage *usage;
+
+       usage = container_of(evaluation, struct lttng_evaluation_buffer_usage,
+                       parent);
+       free(usage);
+}
+
+LTTNG_HIDDEN
+struct lttng_evaluation *lttng_evaluation_buffer_usage_create(
+               enum lttng_condition_type type, uint64_t use, uint64_t capacity)
+{
+       struct lttng_evaluation_buffer_usage *usage;
+
+       usage = zmalloc(sizeof(struct lttng_evaluation_buffer_usage));
+       if (!usage) {
+               goto end;
+       }
+
+       usage->parent.type = type;
+       usage->buffer_use = use;
+       usage->buffer_capacity = capacity;
+       usage->parent.serialize = lttng_evaluation_buffer_usage_serialize;
+       usage->parent.destroy = lttng_evaluation_buffer_usage_destroy;
+end:
+       return &usage->parent;
+}
+
+/*
+ * Get the sampled buffer usage which caused the associated condition to
+ * evaluate to "true".
+ */
+enum lttng_evaluation_status
+lttng_evaluation_buffer_usage_get_usage_ratio(
+               struct lttng_evaluation *evaluation, double *usage_ratio)
+{
+       struct lttng_evaluation_buffer_usage *usage;
+       enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK;
+
+       if (!evaluation || !is_usage_evaluation(evaluation) || !usage_ratio) {
+               status = LTTNG_EVALUATION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(evaluation, struct lttng_evaluation_buffer_usage,
+                       parent);
+       *usage_ratio = (double) usage->buffer_use /
+                       (double) usage->buffer_capacity;
+end:
+       return status;
+}
+
+enum lttng_evaluation_status
+lttng_evaluation_buffer_usage_get_usage(struct lttng_evaluation *evaluation,
+               uint64_t *usage_bytes)
+{
+       struct lttng_evaluation_buffer_usage *usage;
+       enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK;
+
+       if (!evaluation || !is_usage_evaluation(evaluation) || !usage_bytes) {
+               status = LTTNG_EVALUATION_STATUS_INVALID;
+               goto end;
+       }
+
+       usage = container_of(evaluation, struct lttng_evaluation_buffer_usage,
+                       parent);
+       *usage_bytes = usage->buffer_use;
+end:
+       return status;
+}
diff --git a/src/common/condition.c b/src/common/condition.c
new file mode 100644 (file)
index 0000000..2f7f413
--- /dev/null
@@ -0,0 +1,159 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/buffer-usage-internal.h>
+#include <common/macros.h>
+#include <common/error.h>
+#include <stdbool.h>
+#include <assert.h>
+
+enum lttng_condition_type lttng_condition_get_type(
+               struct lttng_condition *condition)
+{
+       return condition ? condition->type : LTTNG_CONDITION_TYPE_UNKNOWN;
+}
+
+void lttng_condition_destroy(struct lttng_condition *condition)
+{
+       if (!condition) {
+               return;
+       }
+
+       assert(condition->destroy);
+       condition->destroy(condition);
+}
+
+LTTNG_HIDDEN
+bool lttng_condition_validate(struct lttng_condition *condition)
+{
+       bool valid;
+
+       if (!condition) {
+               valid = false;
+               goto end;
+       }
+
+       if (!condition->validate) {
+               /* Sub-class guarantees that it can never be invalid. */
+               valid = true;
+               goto end;
+       }
+
+       valid = condition->validate(condition);
+end:
+       return valid;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_serialize(struct lttng_condition *condition, char *buf)
+{
+       ssize_t ret, condition_size;
+       struct lttng_condition_comm condition_comm;
+
+       if (!condition) {
+               ret = -1;
+               goto end;
+       }
+
+       condition_comm.condition_type = (int8_t) condition->type;
+       ret = sizeof(struct lttng_condition_comm);
+       if (buf) {
+               memcpy(buf, &condition_comm, ret);
+               buf += ret;
+       }
+
+       condition_size = condition->serialize(condition, buf);
+       if (condition_size < 0) {
+               ret = condition_size;
+               goto end;
+       }
+       ret += condition_size;
+end:
+       return ret;
+}
+
+LTTNG_HIDDEN
+bool lttng_condition_is_equal(struct lttng_condition *a,
+               struct lttng_condition *b)
+{
+       bool is_equal = false;
+
+       if (!a || !b) {
+               goto end;
+       }
+
+       if (a->type != b->type) {
+               goto end;
+       }
+
+       is_equal = a->equal ? a->equal(a, b) : true;
+end:
+       return is_equal;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_create_from_buffer(const char *buf,
+               struct lttng_condition **condition)
+{
+       ssize_t ret, condition_size = 0;
+       struct lttng_condition_comm *condition_comm =
+                       (struct lttng_condition_comm *) buf;
+
+       if (!buf || !condition) {
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Deserializing condition from buffer");
+       condition_size += sizeof(*condition_comm);
+       buf += condition_size;
+
+       switch ((enum lttng_condition_type) condition_comm->condition_type) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+               ret = lttng_condition_buffer_usage_low_create_from_buffer(buf,
+                               condition);
+               if (ret < 0) {
+                       goto end;
+               }
+               condition_size += ret;
+               break;
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+               ret = lttng_condition_buffer_usage_high_create_from_buffer(buf,
+                               condition);
+               if (ret < 0) {
+                       goto end;
+               }
+               condition_size += ret;
+               break;
+       default:
+               ERR("Attempted to create condition of unknown type (%i)",
+                               (int) condition_comm->condition_type);
+               ret = -1;
+               goto end;
+       }
+       ret = condition_size;
+end:
+       return ret;
+}
+
+LTTNG_HIDDEN
+void lttng_condition_init(struct lttng_condition *condition,
+               enum lttng_condition_type type)
+{
+       condition->type = type;
+}
index b0a434284a87d888f631b3fc8f0d435fc4c1d888..3234e8514f9b7f0c9296bf55e70a73e749850f5d 100644 (file)
@@ -21,6 +21,7 @@
 #include <inttypes.h>
 #include <signal.h>
 
+#include <lttng/ust-ctl.h>
 #include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/compat/endian.h>
@@ -61,8 +62,14 @@ static void setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigaddset live");
        }
+       ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
+       if (ret) {
+               PERROR("sigaddset monitor");
+       }
 }
 
+static int channel_monitor_pipe = -1;
+
 /*
  * Execute action on a timer switch.
  *
@@ -71,7 +78,7 @@ static void setmask(sigset_t *mask)
  * deadlocks.
  */
 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
-               int sig, siginfo_t *si, void *uc)
+               int sig, siginfo_t *si)
 {
        int ret;
        struct lttng_consumer_channel *channel;
@@ -304,7 +311,7 @@ end:
  * Execute action on a live timer
  */
 static void live_timer(struct lttng_consumer_local_data *ctx,
-               int sig, siginfo_t *si, void *uc)
+               int sig, siginfo_t *si)
 {
        int ret;
        struct lttng_consumer_channel *channel;
@@ -411,44 +418,95 @@ void consumer_timer_signal_thread_qs(unsigned int signr)
 }
 
 /*
- * Set the timer for periodical metadata flush.
+ * Start a timer channel timer which will fire at a given interval
+ * (timer_interval_us)and fire a given signal (signal).
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
  */
-void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
-               unsigned int switch_timer_interval)
+static
+int consumer_channel_timer_start(timer_t *timer_id,
+               struct lttng_consumer_channel *channel,
+               unsigned int timer_interval_us, int signal)
 {
-       int ret;
+       int ret = 0, delete_ret;
        struct sigevent sev;
        struct itimerspec its;
 
        assert(channel);
        assert(channel->key);
 
-       if (switch_timer_interval == 0) {
-               return;
+       if (timer_interval_us == 0) {
+               /* No creation needed; not an error. */
+               ret = 1;
+               goto end;
        }
 
        sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
+       sev.sigev_signo = signal;
        sev.sigev_value.sival_ptr = channel;
-       ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
+       ret = timer_create(CLOCKID, &sev, timer_id);
        if (ret == -1) {
                PERROR("timer_create");
+               goto end;
        }
-       channel->switch_timer_enabled = 1;
 
-       its.it_value.tv_sec = switch_timer_interval / 1000000;
-       its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
+       its.it_value.tv_sec = timer_interval_us / 1000000;
+       its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
        its.it_interval.tv_sec = its.it_value.tv_sec;
        its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       ret = timer_settime(channel->switch_timer, 0, &its, NULL);
+       ret = timer_settime(*timer_id, 0, &its, NULL);
        if (ret == -1) {
                PERROR("timer_settime");
+               goto error_destroy_timer;
+       }
+end:
+       return ret;
+error_destroy_timer:
+       delete_ret = timer_delete(*timer_id);
+       if (delete_ret == -1) {
+               PERROR("timer_delete");
+       }
+       goto end;
+}
+
+static
+int consumer_channel_timer_stop(timer_t *timer_id, int signal)
+{
+       int ret = 0;
+
+       ret = timer_delete(*timer_id);
+       if (ret == -1) {
+               PERROR("timer_delete");
+               goto end;
        }
+
+       consumer_timer_signal_thread_qs(signal);
+       *timer_id = 0;
+end:
+       return ret;
+}
+
+/*
+ * Set the channel's switch timer.
+ */
+void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
+               unsigned int switch_timer_interval_us)
+{
+       int ret;
+
+       assert(channel);
+       assert(channel->key);
+
+       ret = consumer_channel_timer_start(&channel->switch_timer, channel,
+                       switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
+
+       channel->switch_timer_enabled = !!(ret == 0);
 }
 
 /*
- * Stop and delete timer.
+ * Stop and delete the channel's switch timer.
  */
 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
 {
@@ -456,72 +514,91 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
 
        assert(channel);
 
-       ret = timer_delete(channel->switch_timer);
+       ret = consumer_channel_timer_stop(&channel->switch_timer,
+                       LTTNG_CONSUMER_SIG_SWITCH);
        if (ret == -1) {
-               PERROR("timer_delete");
+               ERR("Failed to stop switch timer");
        }
 
-       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
-
-       channel->switch_timer = 0;
        channel->switch_timer_enabled = 0;
 }
 
 /*
- * Set the timer for the live mode.
+ * Set the channel's live timer.
  */
 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
-               int live_timer_interval)
+               unsigned int live_timer_interval_us)
 {
        int ret;
-       struct sigevent sev;
-       struct itimerspec its;
 
        assert(channel);
        assert(channel->key);
 
-       if (live_timer_interval <= 0) {
-               return;
-       }
+       ret = consumer_channel_timer_start(&channel->live_timer, channel,
+                       live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
 
-       sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
-       sev.sigev_value.sival_ptr = channel;
-       ret = timer_create(CLOCKID, &sev, &channel->live_timer);
-       if (ret == -1) {
-               PERROR("timer_create");
-       }
-       channel->live_timer_enabled = 1;
+       channel->live_timer_enabled = !!(ret == 0);
+}
 
-       its.it_value.tv_sec = live_timer_interval / 1000000;
-       its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
-       its.it_interval.tv_sec = its.it_value.tv_sec;
-       its.it_interval.tv_nsec = its.it_value.tv_nsec;
+/*
+ * Stop and delete the channel's live timer.
+ */
+void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+{
+       int ret;
 
-       ret = timer_settime(channel->live_timer, 0, &its, NULL);
+       assert(channel);
+
+       ret = consumer_channel_timer_stop(&channel->live_timer,
+                       LTTNG_CONSUMER_SIG_LIVE);
        if (ret == -1) {
-               PERROR("timer_settime");
+               ERR("Failed to stop live timer");
        }
+
+       channel->live_timer_enabled = 0;
 }
 
 /*
- * Stop and delete timer.
+ * Set the channel's monitoring timer.
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
  */
-void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
+               unsigned int monitor_timer_interval_us)
 {
        int ret;
 
        assert(channel);
+       assert(channel->key);
+       assert(!channel->monitor_timer_enabled);
 
-       ret = timer_delete(channel->live_timer);
+       ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
+                       monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
+       channel->monitor_timer_enabled = !!(ret == 0);
+       return ret;
+}
+
+/*
+ * Stop and delete the channel's monitoring timer.
+ */
+int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       assert(channel);
+       assert(channel->monitor_timer_enabled);
+
+       ret = consumer_channel_timer_stop(&channel->monitor_timer,
+                       LTTNG_CONSUMER_SIG_MONITOR);
        if (ret == -1) {
-               PERROR("timer_delete");
+               ERR("Failed to stop live timer");
+               goto end;
        }
 
-       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
-
-       channel->live_timer = 0;
-       channel->live_timer_enabled = 0;
+       channel->monitor_timer_enabled = 0;
+end:
+       return ret;
 }
 
 /*
@@ -544,9 +621,159 @@ int consumer_signal_init(void)
        return 0;
 }
 
+static
+int sample_ust_positions(struct lttng_consumer_channel *channel,
+               uint64_t *_highest_use, uint64_t *_lowest_use)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       bool empty_channel = true;
+       uint64_t high = 0, low = UINT64_MAX;
+       struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+       rcu_read_lock();
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key,
+                       &iter.iter, stream, node_channel_id.node) {
+               unsigned long produced, consumed, usage;
+
+               empty_channel = false;
+
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
+               ret = ustctl_snapshot_sample_positions(stream->ustream);
+               if (ret) {
+                       ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+               ret = ustctl_snapshot_get_consumed(stream->ustream,
+                                                  &consumed);
+               if (ret) {
+                       ERR("Failed to get buffer consumed position in monitor timer");
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+               ret = ustctl_snapshot_get_produced(stream->ustream,
+                                                  &produced);
+               if (ret) {
+                       ERR("Failed to get buffer produced position in monitor timer");
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+
+               usage = produced - consumed;
+               high = (usage > high) ? usage : high;
+               low = (usage < low) ? usage : low;
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       *_highest_use = high;
+       *_lowest_use = low;
+end:
+       rcu_read_unlock();
+       if (empty_channel) {
+               ret = -1;
+       }
+       return ret;
+}
+
+/*
+ * Execute action on a monitor timer.
+ */
+static
+void monitor_timer(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_channel *channel)
+{
+       int ret;
+       int channel_monitor_pipe =
+                       consumer_timer_thread_get_channel_monitor_pipe();
+       struct lttcomm_consumer_channel_monitor_msg msg = {
+               .key = channel->key,
+       };
+
+       assert(channel);
+       pthread_mutex_lock(&consumer_data.lock);
+
+       if (channel_monitor_pipe < 0) {
+               goto end;
+       }
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               /* TODO */
+               ret = -1;
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+       {
+               ret = sample_ust_positions(channel, &msg.highest, &msg.lowest);
+               break;
+       }
+       default:
+               abort();
+       }
+
+       if (ret) {
+               goto end;
+       }
+
+       /*
+        * Writes performed here are assumed to be atomic which is only
+        * guaranteed for sizes < than PIPE_BUF.
+        */
+       assert(sizeof(msg) <= PIPE_BUF);
+
+       do {
+               ret = write(channel_monitor_pipe, &msg, sizeof(msg));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               if (errno == EAGAIN) {
+                       /* Not an error, the sample is merely dropped. */
+                       DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
+                                       channel->key);
+               } else {
+                       PERROR("write to the channel monitor pipe");
+               }
+       } else {
+               DBG("Sent channel monitoring sample for channel key %" PRIu64
+                               ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
+                               channel->key, msg.highest, msg.lowest);
+       }
+end:
+       pthread_mutex_unlock(&consumer_data.lock);
+}
+
+int consumer_timer_thread_get_channel_monitor_pipe(void)
+{
+       return uatomic_read(&channel_monitor_pipe);
+}
+
+int consumer_timer_thread_set_channel_monitor_pipe(int fd)
+{
+       int ret;
+
+       ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
+       if (ret != -1) {
+               ret = -1;
+               goto end;
+       }
+       ret = 0;
+end:
+       return ret;
+}
+
 /*
  * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
- * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
+ * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
+ * LTTNG_CONSUMER_SIG_MONITOR.
  */
 void *consumer_timer_thread(void *data)
 {
@@ -575,20 +802,31 @@ void *consumer_timer_thread(void *data)
                health_poll_entry();
                signr = sigwaitinfo(&mask, &info);
                health_poll_exit();
+
+               /*
+                * NOTE: cascading conditions are used instead of a switch case
+                * since the use of SIGRTMIN in the definition of the signals'
+                * values prevents the reduction to an integer constant.
+                */
                if (signr == -1) {
                        if (errno != EINTR) {
                                PERROR("sigwaitinfo");
                        }
                        continue;
                } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
-                       metadata_switch_timer(ctx, info.si_signo, &info, NULL);
+                       metadata_switch_timer(ctx, info.si_signo, &info);
                } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
                        cmm_smp_mb();
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
                        cmm_smp_mb();
                        DBG("Signal timer metadata thread teardown");
                } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
-                       live_timer(ctx, info.si_signo, &info, NULL);
+                       live_timer(ctx, info.si_signo, &info);
+               } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
+                       struct lttng_consumer_channel *channel;
+
+                       channel = info.si_value.sival_ptr;
+                       monitor_timer(ctx, channel);
                } else {
                        ERR("Unexpected signal %d\n", info.si_signo);
                }
index 22e74574ce5622d7ec7e242b83865fa766b1f639..851a172aa13bfa4608138c951907c13c6f971549 100644 (file)
@@ -27,6 +27,7 @@
 #define LTTNG_CONSUMER_SIG_SWITCH      SIGRTMIN + 10
 #define LTTNG_CONSUMER_SIG_TEARDOWN    SIGRTMIN + 11
 #define LTTNG_CONSUMER_SIG_LIVE                SIGRTMIN + 12
+#define LTTNG_CONSUMER_SIG_MONITOR     SIGRTMIN + 13
 
 #define CLOCKID CLOCK_MONOTONIC
 
@@ -44,15 +45,21 @@ struct timer_signal_data {
 };
 
 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
-               unsigned int switch_timer_interval);
+               unsigned int switch_timer_interval_us);
 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel);
 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
-               int live_timer_interval);
+               unsigned int live_timer_interval_us);
 void consumer_timer_live_stop(struct lttng_consumer_channel *channel);
+int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
+               unsigned int monitor_timer_interval_us);
+int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel);
 void *consumer_timer_thread(void *data);
 int consumer_signal_init(void);
 
 int consumer_flush_kernel_index(struct lttng_consumer_stream *stream);
 int consumer_flush_ust_index(struct lttng_consumer_stream *stream);
 
+int consumer_timer_thread_get_channel_monitor_pipe(void);
+int consumer_timer_thread_set_channel_monitor_pipe(int fd);
+
 #endif /* CONSUMER_TIMER_H */
index e379171ae3df53ff46dbfb5ec6347aac687e396c..3b857dec3779bc82ce7e587c75448f5b4ac4c07f 100644 (file)
@@ -368,6 +368,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->live_timer_enabled == 1) {
                consumer_timer_live_stop(channel);
        }
+       if (channel->monitor_timer_enabled == 1) {
+               consumer_timer_monitor_stop(channel);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -1348,6 +1351,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_metadata_pipe;
        }
 
+       ctx->channel_monitor_pipe = -1;
+
        return ctx;
 
 error_metadata_pipe:
index 37adecbfe689e28a79303c601cefbe2ea6311677..883fd35321b41e68722cc3cef1885637843aa8be 100644 (file)
@@ -61,6 +61,7 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_DISCARDED_EVENTS,
        LTTNG_CONSUMER_LOST_PACKETS,
        LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
+       LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE,
 };
 
 /* State of each fd in consumer */
@@ -160,6 +161,7 @@ struct lttng_consumer_channel {
 
        /* Metadata cache is metadata channel */
        struct consumer_metadata_cache *metadata_cache;
+
        /* For UST metadata periodical flush */
        int switch_timer_enabled;
        timer_t switch_timer;
@@ -170,6 +172,10 @@ struct lttng_consumer_channel {
        timer_t live_timer;
        int live_timer_error;
 
+       /* For channel monitoring timer. */
+       int monitor_timer_enabled;
+       timer_t monitor_timer;
+
        /* On-disk circular buffer */
        uint64_t tracefile_size;
        uint64_t tracefile_count;
@@ -539,6 +545,11 @@ struct lttng_consumer_local_data {
        int consumer_should_quit[2];
        /* Metadata poll thread pipe. Transfer metadata stream to it */
        struct lttng_pipe *consumer_metadata_pipe;
+       /*
+        * Pipe used by the channel monitoring timers to provide state samples
+        * to the session daemon (write-only).
+        */
+       int channel_monitor_pipe;
 };
 
 /*
index 37d222a784fd0b3564a35b3a845fb8fe88b956e2..4b27fb32062e615464ec12e142731402335270f2 100644 (file)
 #define DEFAULT_LTTNG_EXTRA_KMOD_PROBES                "LTTNG_EXTRA_KMOD_PROBES"
 
 /* Default unix socket path */
-#define DEFAULT_GLOBAL_CLIENT_UNIX_SOCK         DEFAULT_LTTNG_RUNDIR "/client-lttng-sessiond"
-#define DEFAULT_HOME_CLIENT_UNIX_SOCK           DEFAULT_LTTNG_HOME_RUNDIR "/client-lttng-sessiond"
-#define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK         DEFAULT_LTTNG_RUNDIR "/sessiond-health"
-#define DEFAULT_HOME_HEALTH_UNIX_SOCK          DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-health"
+#define DEFAULT_GLOBAL_CLIENT_UNIX_SOCK                DEFAULT_LTTNG_RUNDIR "/client-lttng-sessiond"
+#define DEFAULT_HOME_CLIENT_UNIX_SOCK                  DEFAULT_LTTNG_HOME_RUNDIR "/client-lttng-sessiond"
+#define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK                DEFAULT_LTTNG_RUNDIR "/sessiond-health"
+#define DEFAULT_HOME_HEALTH_UNIX_SOCK                  DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-health"
+#define DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK   DEFAULT_LTTNG_RUNDIR "/sessiond-notification"
+#define DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK    DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-notification"
 
 /* Default consumer health unix socket path */
 #define DEFAULT_GLOBAL_USTCONSUMER32_HEALTH_UNIX_SOCK  DEFAULT_LTTNG_RUNDIR "/ustconsumerd32/health"
diff --git a/src/common/dynamic-buffer.c b/src/common/dynamic-buffer.c
new file mode 100644 (file)
index 0000000..d617b11
--- /dev/null
@@ -0,0 +1,166 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <common/dynamic-buffer.h>
+#include <common/macros.h>
+#include <common/utils.h>
+#include <assert.h>
+
+static
+size_t round_to_power_of_2(size_t val)
+{
+       int order;
+       size_t rounded;
+
+       order = utils_get_count_order_u64(val);
+       assert(order >= 0);
+       rounded = (1ULL << order);
+       assert(rounded >= val);
+
+       return rounded;
+}
+
+void lttng_dynamic_buffer_init(struct lttng_dynamic_buffer *buffer)
+{
+       assert(buffer);
+       memset(buffer, 0, sizeof(*buffer));
+}
+
+int lttng_dynamic_buffer_append(struct lttng_dynamic_buffer *buffer,
+               const void *buf, size_t len)
+{
+       int ret = 0;
+
+       if (!buffer || (!buf && len)) {
+               ret = -1;
+               goto end;
+       }
+
+       if (len == 0) {
+               /* Not an error, no-op. */
+               goto end;
+       }
+
+       if ((buffer->capacity - buffer->size) < len) {
+               ret = lttng_dynamic_buffer_set_capacity(buffer,
+                               buffer->capacity +
+                               (len - (buffer->capacity - buffer->size)));
+               if (ret) {
+                       goto end;
+               }
+       }
+
+       memcpy(buffer->data + buffer->size, buf, len);
+       buffer->size += len;
+end:
+       return ret;
+}
+
+int lttng_dynamic_buffer_set_size(struct lttng_dynamic_buffer *buffer,
+               size_t new_size)
+{
+       int ret = 0;
+
+       if (!buffer) {
+               goto end;
+       }
+
+       if (new_size == buffer->size) {
+               goto end;
+       }
+
+       if (new_size > buffer->capacity) {
+               ret = lttng_dynamic_buffer_set_capacity(buffer, new_size);
+               if (ret) {
+                       goto end;
+               }
+       } else if (new_size > buffer->size) {
+               memset(buffer->data + buffer->size, 0, new_size - buffer->size);
+               buffer->size = new_size;
+       } else {
+               /*
+                * Shrinking size. There is no need to zero-out the newly
+                * released memory as it will either be:
+                *   - overwritten by lttng_dynamic_buffer_append,
+                *   - expanded later, which will zero-out the memory
+                *
+                * Users of external APIs are encouraged to set the buffer's
+                * size _before_ making such calls.
+                */
+       }
+       buffer->size = new_size;
+end:
+       return ret;
+}
+
+int lttng_dynamic_buffer_set_capacity(struct lttng_dynamic_buffer *buffer,
+               size_t new_capacity)
+{
+       int ret = 0;
+       size_t rounded_capacity = round_to_power_of_2(new_capacity);
+
+       if (!buffer || new_capacity < buffer->size) {
+               ret = -1;
+               goto end;
+       }
+
+       if (rounded_capacity == buffer->capacity) {
+               goto end;
+       }
+
+       if (!buffer->data) {
+               buffer->data = zmalloc(rounded_capacity);
+               if (!buffer->data) {
+                       ret = -1;
+                       goto end;
+               }
+       } else {
+               void *new_buf;
+
+               new_buf = realloc(buffer->data, rounded_capacity);
+               if (new_buf) {
+                       if (rounded_capacity > buffer->capacity) {
+                               memset(new_buf + buffer->capacity, 0,
+                                               rounded_capacity - buffer->capacity);
+                       }
+               } else {
+                       /* Realloc failed, try to acquire a new block. */
+                       new_buf = zmalloc(rounded_capacity);
+                       if (!new_buf) {
+                               ret = -1;
+                               goto end;
+                       }
+                       memcpy(new_buf, buffer->data, buffer->size);
+                       free(buffer->data);
+               }
+               buffer->data = new_buf;
+       }
+       buffer->capacity = rounded_capacity;
+end:
+       return ret;
+}
+
+/* Release any memory used by the dynamic buffer. */
+void lttng_dynamic_buffer_reset(struct lttng_dynamic_buffer *buffer)
+{
+       if (!buffer) {
+               return;
+       }
+       buffer->size = 0;
+       buffer->capacity = 0;
+       free(buffer->data);
+}
diff --git a/src/common/dynamic-buffer.h b/src/common/dynamic-buffer.h
new file mode 100644 (file)
index 0000000..82efd59
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_DYNAMIC_BUFFER_H
+#define LTTNG_DYNAMIC_BUFFER_H
+
+#include <stddef.h>
+#include <stdint.h>
+
+struct lttng_dynamic_buffer {
+       char *data;
+       size_t size;
+       size_t capacity;
+};
+
+void lttng_dynamic_buffer_init(struct lttng_dynamic_buffer *buffer);
+
+int lttng_dynamic_buffer_append(struct lttng_dynamic_buffer *buffer,
+               const void *buf, size_t len);
+
+/*
+ * Set the buffer's size to new_size. The capacity of the buffer will
+ * be expanded (if necessary) to accomodate new_size. Areas acquired by
+ * an enlarging new_size _will be zeroed_.
+ *
+ * Be careful to expand the buffer's size _before_ calling out external
+ * APIs (e.g. read(3)) which may populate the buffer as setting the size
+ * _after_ will zero-out the result of the operation.
+ */
+int lttng_dynamic_buffer_set_size(struct lttng_dynamic_buffer *buffer,
+               size_t new_size);
+
+/*
+ * Set the buffer's capacity to accomodate the new_capacity, allocating memory
+ * as necessary. The buffer's content is preserved.
+ *
+ * If the current size > new_capacity, the operation will fail.
+ */
+int lttng_dynamic_buffer_set_capacity(struct lttng_dynamic_buffer *buffer,
+               size_t new_capacity);
+
+/* Release any memory used by the dynamic buffer. */
+void lttng_dynamic_buffer_reset(struct lttng_dynamic_buffer *buffer);
+
+#endif /* LTTNG_DYNAMIC_BUFFER_H */
diff --git a/src/common/endpoint.c b/src/common/endpoint.c
new file mode 100644 (file)
index 0000000..89066de
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/endpoint-internal.h>
+
+static
+struct lttng_endpoint lttng_session_daemon_notification_endpoint_instance = {
+       .type = LTTNG_ENDPOINT_TYPE_DEFAULT_SESSIOND_NOTIFICATION
+};
+
+struct lttng_endpoint *lttng_session_daemon_notification_endpoint =
+               &lttng_session_daemon_notification_endpoint_instance;
index 938932cdc7b5cbd6ab03a1789cf9330712a8b5f2..2215886db1be67d5c93e3b89233c8e101f26a96d 100644 (file)
@@ -186,6 +186,10 @@ static const char *error_string_array[] = {
        [ ERROR_INDEX(LTTNG_ERR_REGEN_STATEDUMP_FAIL) ] = "Failed to regenerate the state dump",
        [ ERROR_INDEX(LTTNG_ERR_REGEN_STATEDUMP_NOMEM) ] = "Failed to regenerate the state dump, not enough memory",
        [ ERROR_INDEX(LTTNG_ERR_NOT_SNAPSHOT_SESSION) ] = "Snapshot command can't be applied to a non-snapshot session",
+       [ ERROR_INDEX(LTTNG_ERR_INVALID_TRIGGER) ] = "Invalid trigger",
+       [ ERROR_INDEX(LTTNG_ERR_TRIGGER_EXISTS) ] = "Trigger already registered",
+       [ ERROR_INDEX(LTTNG_ERR_TRIGGER_NOT_FOUND) ] = "Trigger not found",
+       [ ERROR_INDEX(LTTNG_ERR_COMMAND_CANCELLED) ] = "Command cancelled",
 
        /* Last element */
        [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code"
diff --git a/src/common/evaluation.c b/src/common/evaluation.c
new file mode 100644 (file)
index 0000000..49cba5d
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/condition/evaluation-internal.h>
+#include <lttng/condition/buffer-usage-internal.h>
+#include <common/macros.h>
+#include <common/error.h>
+#include <stdbool.h>
+#include <assert.h>
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_serialize(struct lttng_evaluation *evaluation,
+               char *buf)
+{
+       ssize_t ret, offset = 0;
+       struct lttng_evaluation_comm evaluation_comm;
+
+       evaluation_comm.type = (int8_t) evaluation->type;
+       if (buf) {
+               memcpy(buf, &evaluation_comm, sizeof(evaluation_comm));
+       }
+       offset += sizeof(evaluation_comm);
+
+       if (evaluation->serialize) {
+               ret = evaluation->serialize(evaluation,
+                               buf ? (buf + offset) : NULL);
+               if (ret < 0) {
+                       goto end;
+               }
+               offset += ret;
+       }
+
+       ret = offset;
+end:
+       return ret;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_create_from_buffer(const char *buf,
+               struct lttng_evaluation **evaluation)
+{
+       ssize_t ret, evaluation_size = 0;
+       struct lttng_evaluation_comm *evaluation_comm =
+                       (struct lttng_evaluation_comm *) buf;
+
+       if (!buf || !evaluation) {
+               ret = -1;
+               goto end;
+       }
+
+       evaluation_size += sizeof(*evaluation_comm);
+       buf += evaluation_size;
+
+       switch ((enum lttng_condition_type) evaluation_comm->type) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+               ret = lttng_evaluation_buffer_usage_low_create_from_buffer(buf,
+                               evaluation);
+               if (ret < 0) {
+                       goto end;
+               }
+               evaluation_size += ret;
+               break;
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+               ret = lttng_evaluation_buffer_usage_high_create_from_buffer(buf,
+                               evaluation);
+               if (ret < 0) {
+                       goto end;
+               }
+               evaluation_size += ret;
+               break;
+       default:
+               ERR("Attempted to create evaluation of unknown type (%i)",
+                               (int) evaluation_comm->type);
+               ret = -1;
+               goto end;
+       }
+       ret = evaluation_size;
+end:
+       return ret;
+}
+
+enum lttng_condition_type lttng_evaluation_get_type(
+               struct lttng_evaluation *evaluation)
+{
+       return evaluation ? evaluation->type : LTTNG_CONDITION_TYPE_UNKNOWN;
+}
+
+void lttng_evaluation_destroy(struct lttng_evaluation *evaluation)
+{
+       if (!evaluation) {
+               return;
+       }
+
+       assert(evaluation->destroy);
+       evaluation->destroy(evaluation);
+}
index 90849ed30ae5e543a577f9214bc33152864751f8..7eaf27cdf3787cbf02d5c4fc2bc48eb43632b734 100644 (file)
@@ -20,6 +20,7 @@
 #define _MACROS_H
 
 #include <stdlib.h>
+#include <stddef.h>
 #include <string.h>
 #include <common/compat/string.h>
 
@@ -58,6 +59,14 @@ void *zmalloc(size_t len)
 #define ARRAY_SIZE(array)   (sizeof(array) / (sizeof((array)[0])))
 #endif
 
+#ifndef container_of
+#define container_of(ptr, type, member)                                        \
+       ({                                                              \
+               const typeof(((type *)NULL)->member) * __ptr = (ptr);   \
+               (type *)((char *)__ptr - offsetof(type, member));       \
+       })
+#endif
+
 #ifndef max
 #define max(a, b) ((a) > (b) ? (a) : (b))
 #endif
diff --git a/src/common/notification.c b/src/common/notification.c
new file mode 100644 (file)
index 0000000..9970974
--- /dev/null
@@ -0,0 +1,168 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/notification/notification-internal.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/evaluation-internal.h>
+#include <lttng/condition/condition.h>
+#include <lttng/condition/evaluation.h>
+#include <assert.h>
+
+LTTNG_HIDDEN
+struct lttng_notification *lttng_notification_create(
+               struct lttng_condition *condition,
+               struct lttng_evaluation *evaluation)
+{
+       struct lttng_notification *notification = NULL;
+
+       if (!condition || !evaluation) {
+               goto end;
+       }
+
+       notification = zmalloc(sizeof(struct lttng_notification));
+       if (!notification) {
+               goto end;
+       }
+
+       notification->condition = condition;
+       notification->evaluation = evaluation;
+       notification->owns_elements = false;
+end:
+       return notification;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_notification_serialize(struct lttng_notification *notification,
+               char *buf)
+{
+       ssize_t ret, condition_size, evaluation_size, offset = 0;
+       struct lttng_notification_comm notification_comm;
+
+       if (!notification) {
+               ret = -1;
+               goto end;
+       }
+
+       offset += sizeof(notification_comm);
+       condition_size = lttng_condition_serialize(notification->condition,
+                       buf ? (buf + offset) : NULL);
+       if (condition_size < 0) {
+               ret = condition_size;
+               goto end;
+       }
+       offset += condition_size;
+
+       evaluation_size = lttng_evaluation_serialize(notification->evaluation,
+                       buf ? (buf + offset) : NULL);
+       if (evaluation_size < 0) {
+               ret = evaluation_size;
+               goto end;
+       }
+       offset += evaluation_size;
+
+       if (buf) {
+               notification_comm.length =
+                               (uint32_t) (condition_size + evaluation_size);
+               memcpy(buf, &notification_comm, sizeof(notification_comm));
+       }
+       ret = offset;
+end:
+       return ret;
+
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_notification_create_from_buffer(const char *buf,
+               struct lttng_notification **notification)
+{
+       ssize_t ret, offset = 0, condition_size, evaluation_size;
+       struct lttng_notification_comm *notification_comm;
+       struct lttng_condition *condition;
+       struct lttng_evaluation *evaluation;
+
+       if (!buf || !notification) {
+               ret = -1;
+               goto end;
+       }
+
+       notification_comm = (struct lttng_notification_comm *) buf;
+       offset += sizeof(*notification_comm);
+
+       /* struct lttng_condition */
+       condition_size = lttng_condition_create_from_buffer(buf + offset,
+                       &condition);
+       if (condition_size < 0) {
+               ret = condition_size;
+               goto end;
+       }
+       offset += condition_size;
+
+       /* struct lttng_evaluation */
+       evaluation_size = lttng_evaluation_create_from_buffer(buf + offset,
+                       &evaluation);
+       if (evaluation_size < 0) {
+               ret = evaluation_size;
+               goto end;
+       }
+       offset += evaluation_size;
+
+       /* Unexpected size of inner-elements; the buffer is corrupted. */
+       if ((ssize_t) notification_comm->length !=
+                       condition_size + evaluation_size) {
+               ret = -1;
+               goto error;
+       }
+
+       *notification = lttng_notification_create(condition, evaluation);
+       if (!*notification) {
+               ret = -1;
+               goto error;
+       }
+       ret = offset;
+       (*notification)->owns_elements = true;
+end:
+       return ret;
+error:
+       lttng_condition_destroy(condition);
+       lttng_evaluation_destroy(evaluation);
+       return ret;
+}
+
+void lttng_notification_destroy(struct lttng_notification *notification)
+{
+       if (!notification) {
+               return;
+       }
+
+       if (notification->owns_elements) {
+               lttng_condition_destroy(notification->condition);
+               lttng_evaluation_destroy(notification->evaluation);
+       }
+       free(notification);
+}
+
+struct lttng_condition *lttng_notification_get_condition(
+               struct lttng_notification *notification)
+{
+       return notification ? notification->condition : NULL;
+}
+
+struct lttng_evaluation *lttng_notification_get_evaluation(
+               struct lttng_notification *notification)
+{
+       return notification ? notification->evaluation : NULL;
+}
diff --git a/src/common/notify.c b/src/common/notify.c
new file mode 100644 (file)
index 0000000..956c0ec
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/action/action-internal.h>
+#include <lttng/action/notify-internal.h>
+#include <common/macros.h>
+#include <assert.h>
+
+static
+void lttng_action_notify_destroy(struct lttng_action *action)
+{
+       free(action);
+}
+
+static
+ssize_t lttng_action_notify_serialize(struct lttng_action *action, char *buf)
+{
+       return 0;
+}
+
+struct lttng_action *lttng_action_notify_create(void)
+{
+       struct lttng_action_notify *notify;
+
+       notify = zmalloc(sizeof(struct lttng_action_notify));
+       if (!notify) {
+               goto end;
+       }
+
+       notify->parent.type = LTTNG_ACTION_TYPE_NOTIFY;
+       notify->parent.serialize = lttng_action_notify_serialize;
+       notify->parent.destroy = lttng_action_notify_destroy;
+end:
+       return &notify->parent;
+}
index 00211238e603ebc605f2bc63578bfa82652c0f0b..ffdecc0d5e2086076fa78c21d5f75b5e8fc451e7 100644 (file)
@@ -311,3 +311,69 @@ error:
        unlock_write_side(pipe);
        return ret;
 }
+
+/*
+ * Return and release the read end of the pipe.
+ *
+ * This call transfers the ownership of the read fd of the underlying pipe
+ * to the caller if it is still open.
+ *
+ * Returns the fd of the read end of the pipe, or -1 if it was already closed or
+ * released.
+ */
+LTTNG_HIDDEN
+int lttng_pipe_release_readfd(struct lttng_pipe *pipe)
+{
+       int ret;
+
+       if (!pipe) {
+               ret = -1;
+               goto end;
+       }
+
+       lock_read_side(pipe);
+       if (!lttng_pipe_is_read_open(pipe)) {
+               ret = -1;
+               goto end_unlock;
+       }
+       ret = pipe->fd[0];
+       pipe->fd[0] = -1;
+       pipe->r_state = LTTNG_PIPE_STATE_CLOSED;
+end_unlock:
+       unlock_read_side(pipe);
+end:
+       return ret;
+}
+
+/*
+ * Return and release the write end of the pipe.
+ *
+ * This call transfers the ownership of the write fd of the underlying pipe
+ * to the caller if it is still open.
+ *
+ * Returns the fd of the write end of the pipe, or -1 if it was alwritey closed
+ * or released.
+ */
+LTTNG_HIDDEN
+int lttng_pipe_release_writefd(struct lttng_pipe *pipe)
+{
+       int ret;
+
+       if (!pipe) {
+               ret = -1;
+               goto end;
+       }
+
+       lock_write_side(pipe);
+       if (!lttng_pipe_is_write_open(pipe)) {
+               ret = -1;
+               goto end_unlock;
+       }
+       ret = pipe->fd[1];
+       pipe->fd[1] = -1;
+       pipe->w_state = LTTNG_PIPE_STATE_CLOSED;
+end_unlock:
+       unlock_write_side(pipe);
+end:
+       return ret;
+}
index 0bc2db3250ec8eb62bec5758245a69a2dfb0c001..b9e0904a2a3e8d176f3ae29bef23198f7e42d7a8 100644 (file)
@@ -90,5 +90,11 @@ ssize_t lttng_pipe_read(struct lttng_pipe *pipe, void *buf, size_t count);
 LTTNG_HIDDEN
 ssize_t lttng_pipe_write(struct lttng_pipe *pipe, const void *buf,
                size_t count);
+/* Returns and releases the read end of the pipe. */
+LTTNG_HIDDEN
+int lttng_pipe_release_readfd(struct lttng_pipe *pipe);
+/* Returns and releases the write end of the pipe. */
+LTTNG_HIDDEN
+int lttng_pipe_release_writefd(struct lttng_pipe *pipe);
 
 #endif /* LTTNG_PIPE_H */
index 0c2667031b55bd5603adaca04b09bf5fcab808c1..3e85b9519a8b159e34e6a39f404df2d7c058ecf1 100644 (file)
@@ -29,6 +29,7 @@
 #include <lttng/lttng.h>
 #include <lttng/snapshot-internal.h>
 #include <lttng/save-internal.h>
+#include <lttng/trigger/trigger-internal.h>
 #include <common/compat/socket.h>
 #include <common/uri.h>
 #include <common/defaults.h>
@@ -96,6 +97,8 @@ enum lttcomm_sessiond_command {
        LTTNG_SET_SESSION_SHM_PATH          = 40,
        LTTNG_REGENERATE_METADATA           = 41,
        LTTNG_REGENERATE_STATEDUMP          = 42,
+       LTTNG_REGISTER_TRIGGER              = 43,
+       LTTNG_UNREGISTER_TRIGGER            = 44,
 };
 
 enum lttcomm_relayd_command {
@@ -146,6 +149,7 @@ enum lttcomm_return_code {
        LTTCOMM_CONSUMERD_RELAYD_FAIL,              /* Error on remote relayd */
        LTTCOMM_CONSUMERD_CHANNEL_FAIL,             /* Channel creation failed. */
        LTTCOMM_CONSUMERD_CHAN_NOT_FOUND,           /* Channel not found. */
+       LTTCOMM_CONSUMERD_ALREADY_SET,              /* Resource already set. */
 
        /* MUST be last element */
        LTTCOMM_NR,                                             /* Last element */
@@ -311,6 +315,9 @@ struct lttcomm_session_msg {
                struct {
                        uint32_t pid;
                } LTTNG_PACKED pid_tracker;
+               struct {
+                       uint32_t length;
+               } LTTNG_PACKED trigger;
        } u;
 } LTTNG_PACKED;
 
@@ -454,6 +461,7 @@ struct lttcomm_consumer_msg {
                        uint32_t switch_timer_interval;         /* usec */
                        uint32_t read_timer_interval;           /* usec */
                        unsigned int live_timer_interval;               /* usec */
+                       uint32_t monitor_timer_interval;        /* usec */
                        int32_t output;                         /* splice, mmap */
                        int32_t type;                           /* metadata or per_cpu */
                        uint64_t session_id;                    /* Tracing session id */
@@ -531,6 +539,19 @@ struct lttcomm_consumer_msg {
        } u;
 } LTTNG_PACKED;
 
+/*
+ * Channel monitoring message returned to the session daemon on every
+ * monitor timer expiration.
+ */
+struct lttcomm_consumer_channel_monitor_msg {
+       /* Key of the sampled channel. */
+       uint64_t key;
+       /*
+        * Lowest and highest usage (bytes) at the moment the sample was taken.
+        */
+       uint64_t lowest, highest;
+} LTTNG_PACKED;
+
 /*
  * Status message returned to the sessiond after a received command.
  */
diff --git a/src/common/trigger.c b/src/common/trigger.c
new file mode 100644 (file)
index 0000000..59c3366
--- /dev/null
@@ -0,0 +1,179 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/trigger/trigger-internal.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/action/action-internal.h>
+#include <common/error.h>
+#include <assert.h>
+
+LTTNG_HIDDEN
+bool lttng_trigger_validate(struct lttng_trigger *trigger)
+{
+       bool valid;
+
+       if (!trigger) {
+               valid = false;
+               goto end;
+       }
+
+       valid = lttng_condition_validate(trigger->condition) &&
+                       lttng_action_validate(trigger->action);
+end:
+       return valid;
+}
+
+struct lttng_trigger *lttng_trigger_create(
+               struct lttng_condition *condition,
+               struct lttng_action *action)
+{
+       struct lttng_trigger *trigger = NULL;
+
+       if (!condition || !action) {
+               goto end;
+       }
+
+       trigger = zmalloc(sizeof(struct lttng_trigger));
+       if (!trigger) {
+               goto end;
+       }
+
+       trigger->condition = condition;
+       trigger->action = action;
+end:
+       return trigger;
+}
+
+struct lttng_condition *lttng_trigger_get_condition(
+               struct lttng_trigger *trigger)
+{
+       return trigger ? trigger->condition : NULL;
+}
+
+extern struct lttng_action *lttng_trigger_get_action(
+               struct lttng_trigger *trigger)
+{
+       return trigger ? trigger->action : NULL;
+}
+
+void lttng_trigger_destroy(struct lttng_trigger *trigger)
+{
+       if (!trigger) {
+               return;
+       }
+
+       lttng_condition_destroy(trigger->condition);
+       lttng_action_destroy(trigger->action);
+       free(trigger);
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_trigger_create_from_buffer(const char *buf,
+               struct lttng_trigger **trigger)
+{
+       ssize_t ret, offset = 0, condition_size, action_size;
+       struct lttng_condition *condition = NULL;
+       struct lttng_action *action = NULL;
+       struct lttng_trigger_comm *trigger_comm;
+
+       if (!buf || !trigger) {
+               ret = -1;
+               goto end;
+       }
+
+       /* lttng_trigger_comm header */
+       trigger_comm = (struct lttng_trigger_comm *) buf;
+       offset += sizeof(*trigger_comm);
+
+       /* struct lttng_condition */
+       condition_size = lttng_condition_create_from_buffer(buf + offset,
+                       &condition);
+       if (condition_size < 0) {
+               ret = condition_size;
+               goto end;
+       }
+       offset += condition_size;
+
+       /* struct lttng_action */
+       action_size = lttng_action_create_from_buffer(buf + offset, &action);
+       if (action_size < 0) {
+               ret = action_size;
+               goto end;
+       }
+       offset += action_size;
+
+       /* Unexpected size of inner-elements; the buffer is corrupted. */
+       if ((ssize_t) trigger_comm->length != condition_size + action_size) {
+               ret = -1;
+               goto error;
+       }
+
+       *trigger = lttng_trigger_create(condition, action);
+       if (!*trigger) {
+               ret = -1;
+               goto error;
+       }
+       ret = offset;
+end:
+       return ret;
+error:
+       lttng_condition_destroy(condition);
+       lttng_action_destroy(action);
+       return ret;
+}
+
+/*
+ * Returns the size of a trigger (header + condition + action).
+ * Both elements are stored contiguously, see their "*_comm" structure
+ * for the detailed format.
+ */
+LTTNG_HIDDEN
+ssize_t lttng_trigger_serialize(struct lttng_trigger *trigger, char *buf)
+{
+       struct lttng_trigger_comm trigger_comm;
+       ssize_t action_size, condition_size, offset = 0, ret;
+
+       if (!trigger) {
+               ret = -1;
+               goto end;
+       }
+
+       offset += sizeof(trigger_comm);
+       condition_size = lttng_condition_serialize(trigger->condition,
+                       buf ? (buf + offset) : NULL);
+       if (condition_size < 0) {
+               ret = -1;
+               goto end;
+       }
+       offset += condition_size;
+
+       action_size = lttng_action_serialize(trigger->action,
+                       buf ? (buf + offset) : NULL);
+       if (action_size < 0) {
+               ret = -1;
+               goto end;
+       }
+       offset += action_size;
+
+       if (buf) {
+               trigger_comm.length = (uint32_t) (condition_size + action_size);
+               memcpy(buf, &trigger_comm, sizeof(trigger_comm));
+       }
+       ret = offset;
+end:
+       return ret;
+}
index 11c30781ba02ea5fa81f84e7f7400e15bb4436ce..1d4ee9b3686b1a38efa2d74cf7bc6c2939be95ee 100644 (file)
@@ -226,7 +226,7 @@ ssize_t lttcomm_send_unix_sock(int sock, const void *buf, size_t len)
 {
        struct msghdr msg;
        struct iovec iov[1];
-       ssize_t ret = -1;
+       ssize_t ret;
 
        memset(&msg, 0, sizeof(msg));
 
@@ -235,17 +235,28 @@ ssize_t lttcomm_send_unix_sock(int sock, const void *buf, size_t len)
        msg.msg_iov = iov;
        msg.msg_iovlen = 1;
 
-       ret = sendmsg(sock, &msg, 0);
-       if (ret < 0) {
-               /*
-                * Only warn about EPIPE when quiet mode is deactivated.
-                * We consider EPIPE as expected.
-                */
-               if (errno != EPIPE || !lttng_opt_quiet) {
-                       PERROR("sendmsg");
+       while (iov[0].iov_len) {
+               ret = sendmsg(sock, &msg, 0);
+               if (ret < 0) {
+                       if (errno == EINTR) {
+                               continue;
+                       } else {
+                               /*
+                                * Only warn about EPIPE when quiet mode is
+                                * deactivated.
+                                * We consider EPIPE as expected.
+                                */
+                               if (errno != EPIPE || !lttng_opt_quiet) {
+                                       PERROR("sendmsg");
+                               }
+                               goto end;
+                       }
                }
+               iov[0].iov_len -= ret;
+               iov[0].iov_base += ret;
        }
-
+       ret = len;
+end:
        return ret;
 }
 
index 97f0497cb12b14a71da7289c5030b256a8848ebd..af9ed5acdd369169c95103f8aa0a7a17fbebd4df 100644 (file)
@@ -1501,8 +1501,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_timer_switch_start(channel, attr.switch_timer_interval);
                        attr.switch_timer_interval = 0;
                } else {
+                       int monitor_start_ret;
+
                        consumer_timer_live_start(channel,
                                        msg.u.ask_channel.live_timer_interval);
+                       monitor_start_ret = consumer_timer_monitor_start(
+                                       channel,
+                                       msg.u.ask_channel.monitor_timer_interval);
+                       if (monitor_start_ret < 0) {
+                               ERR("Starting channel monitoring timer failed");
+                               goto end_channel_error;
+                       }
                }
 
                health_code_update();
@@ -1525,6 +1534,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        if (channel->live_timer_enabled == 1) {
                                consumer_timer_live_stop(channel);
                        }
+                       if (channel->monitor_timer_enabled == 1) {
+                               consumer_timer_monitor_stop(channel);
+                       }
                        goto end_channel_error;
                }
 
@@ -1857,6 +1869,51 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                break;
        }
+       case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+       {
+               int channel_monitor_pipe;
+
+               ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+               /* Successfully received the command's type. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       goto error_fatal;
+               }
+
+               ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
+                               1);
+               if (ret != sizeof(channel_monitor_pipe)) {
+                       ERR("Failed to receive channel monitor pipe");
+                       goto error_fatal;
+               }
+
+               DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+               ret = consumer_timer_thread_set_channel_monitor_pipe(
+                               channel_monitor_pipe);
+               if (!ret) {
+                       int flags;
+
+                       ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+                       /* Set the pipe as non-blocking. */
+                       ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
+                       if (ret == -1) {
+                               PERROR("fcntl get flags of the channel monitoring pipe");
+                               goto error_fatal;
+                       }
+                       flags = ret;
+
+                       ret = fcntl(channel_monitor_pipe, F_SETFL,
+                                       flags | O_NONBLOCK);
+                       if (ret == -1) {
+                               PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+                               goto error_fatal;
+                       }
+                       DBG("Channel monitor pipe set as non-blocking");
+               } else {
+                       ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+               }
+               goto end_msg_sessiond;
+       }
        default:
                break;
        }
index 16d2f817ad8a1b21d99228a67e5bba25e0d1f3fa..4d49728c0f47a23cce6eba1bc2f72f11c2c14836 100644 (file)
@@ -1017,6 +1017,59 @@ static inline unsigned int fls_u32(uint32_t x)
 #define HAS_FLS_U32
 #endif
 
+#if defined(__x86_64)
+static inline
+unsigned int fls_u64(uint64_t x)
+{
+       long r;
+
+       asm("bsrq %1,%0\n\t"
+           "jnz 1f\n\t"
+           "movq $-1,%0\n\t"
+           "1:\n\t"
+           : "=r" (r) : "rm" (x));
+       return r + 1;
+}
+#define HAS_FLS_U64
+#endif
+
+#ifndef HAS_FLS_U64
+static __attribute__((unused))
+unsigned int fls_u64(uint64_t x)
+{
+       unsigned int r = 64;
+
+       if (!x)
+               return 0;
+
+       if (!(x & 0xFFFFFFFF00000000ULL)) {
+               x <<= 32;
+               r -= 32;
+       }
+       if (!(x & 0xFFFF000000000000ULL)) {
+               x <<= 16;
+               r -= 16;
+       }
+       if (!(x & 0xFF00000000000000ULL)) {
+               x <<= 8;
+               r -= 8;
+       }
+       if (!(x & 0xF000000000000000ULL)) {
+               x <<= 4;
+               r -= 4;
+       }
+       if (!(x & 0xC000000000000000ULL)) {
+               x <<= 2;
+               r -= 2;
+       }
+       if (!(x & 0x8000000000000000ULL)) {
+               x <<= 1;
+               r -= 1;
+       }
+       return r;
+}
+#endif
+
 #ifndef HAS_FLS_U32
 static __attribute__((unused)) unsigned int fls_u32(uint32_t x)
 {
@@ -1063,6 +1116,20 @@ int utils_get_count_order_u32(uint32_t x)
        return fls_u32(x - 1);
 }
 
+/*
+ * Return the minimum order for which x <= (1UL << order).
+ * Return -1 if x is 0.
+ */
+LTTNG_HIDDEN
+int utils_get_count_order_u64(uint64_t x)
+{
+       if (!x) {
+               return -1;
+       }
+
+       return fls_u64(x - 1);
+}
+
 /**
  * Obtain the value of LTTNG_HOME environment variable, if exists.
  * Otherwise returns the value of HOME.
index 7285f5c3032932ec03531ddb7ef7973f403fbc6b..0daf5b98df70d7add96b6a7fda2fac53e85ad681 100644 (file)
@@ -48,6 +48,7 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
                int *stream_fd);
 int utils_parse_size_suffix(char const * const str, uint64_t * const size);
 int utils_get_count_order_u32(uint32_t x);
+int utils_get_count_order_u64(uint64_t x);
 char *utils_get_home_dir(void);
 char *utils_get_user_home_dir(uid_t uid);
 char *utils_get_kmod_probes_list(void);
index 6b6a6eed127525f17a2a94454ea18e8fbf3dacc1..b5156caf94aa9320a2712a7525a5138bff1b4b74 100644 (file)
@@ -5,7 +5,8 @@ SUBDIRS = filter
 lib_LTLIBRARIES = liblttng-ctl.la
 
 liblttng_ctl_la_SOURCES = lttng-ctl.c snapshot.c lttng-ctl-helper.h \
-               lttng-ctl-health.c save.c load.c deprecated-symbols.c
+               lttng-ctl-health.c save.c load.c deprecated-symbols.c \
+               channel.c
 
 liblttng_ctl_la_LDFLAGS = \
                $(LT_NO_UNDEFINED)
diff --git a/src/lib/lttng-ctl/channel.c b/src/lib/lttng-ctl/channel.c
new file mode 100644 (file)
index 0000000..c428299
--- /dev/null
@@ -0,0 +1,285 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/notification/notification-internal.h>
+#include <lttng/notification/channel-internal.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/endpoint.h>
+#include <common/error.h>
+#include <common/dynamic-buffer.h>
+#include <common/utils.h>
+#include <common/defaults.h>
+#include <assert.h>
+#include "lttng-ctl-helper.h"
+
+struct lttng_notification_channel *lttng_notification_channel_create(
+               struct lttng_endpoint *endpoint)
+{
+       int fd, ret;
+       bool is_in_tracing_group = false, is_root = false;
+       char *sock_path = NULL;
+       struct lttng_notification_channel *channel = NULL;
+
+       if (!endpoint ||
+                       endpoint != lttng_session_daemon_notification_endpoint) {
+               goto end;
+       }
+
+       sock_path = zmalloc(LTTNG_PATH_MAX);
+       if (!sock_path) {
+               goto end;
+       }
+
+       channel = zmalloc(sizeof(struct lttng_notification_channel));
+       if (!channel) {
+               goto end;
+       }
+       channel->socket = -1;
+
+       is_root = (getuid() == 0);
+       if (!is_root) {
+               is_in_tracing_group = lttng_check_tracing_group();
+       }
+
+       if (is_root || is_in_tracing_group) {
+               lttng_ctl_copy_string(sock_path,
+                               DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
+                               LTTNG_PATH_MAX);
+               ret = lttcomm_connect_unix_sock(sock_path);
+               if (ret >= 0) {
+                       fd = ret;
+                       goto set_fd;
+               }
+       }
+
+       /* Fallback to local session daemon. */
+       ret = snprintf(sock_path, LTTNG_PATH_MAX,
+                       DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
+                       utils_get_home_dir());
+       if (ret < 0 || ret >= LTTNG_PATH_MAX) {
+               goto error;
+       }
+
+       ret = lttcomm_connect_unix_sock(sock_path);
+       if (ret < 0) {
+               goto error;
+       }
+       fd = ret;
+
+set_fd:
+       channel->socket = fd;
+
+       /* FIXME send creds */
+end:
+       free(sock_path);
+       return channel;
+error:
+       lttng_notification_channel_destroy(channel);
+       channel = NULL;
+       goto end;
+}
+
+enum lttng_notification_channel_status
+lttng_notification_channel_get_next_notification(
+               struct lttng_notification_channel *channel,
+               struct lttng_notification **_notification)
+{
+       ssize_t ret;
+       struct lttng_notification_comm comm;
+       struct lttng_notification *notification = NULL;
+       struct lttng_dynamic_buffer reception_buffer;
+       struct lttng_notification_channel_message msg;
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+
+       lttng_dynamic_buffer_init(&reception_buffer);
+
+       if (!channel || !_notification) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
+               goto end;
+       }
+
+       ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
+       if (ret <= 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end;
+       }
+       if (msg.type != LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttcomm_recv_unix_sock(channel->socket, &comm, sizeof(comm));
+       if (ret < sizeof(comm)) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_dynamic_buffer_set_size(&reception_buffer,
+                       comm.length + sizeof(comm));
+       if (ret) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end;
+       }
+
+       memcpy(reception_buffer.data, &comm, sizeof(comm));
+       ret = lttcomm_recv_unix_sock(channel->socket,
+                       reception_buffer.data + sizeof(comm),
+                       comm.length);
+       if (ret < (ssize_t) comm.length) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_notification_create_from_buffer(reception_buffer.data,
+                       &notification);
+       if (ret != (sizeof(comm) + (ssize_t) comm.length)) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto error;
+       }
+       *_notification = notification;
+end:
+       lttng_dynamic_buffer_reset(&reception_buffer);
+       return status;
+error:
+       lttng_notification_destroy(notification);
+       goto end;
+}
+
+static
+enum lttng_notification_channel_status send_command(
+               struct lttng_notification_channel *channel,
+               enum lttng_notification_channel_message_type type,
+               struct lttng_condition *condition)
+{
+       int socket;
+       ssize_t command_size, ret;
+       size_t received = 0;
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+       char *command_buffer = NULL;
+       struct lttng_notification_channel_message cmd_message = {
+               .type = type,
+       };
+       struct lttng_notification_channel_message reply_message;
+       struct lttng_notification_channel_command_reply reply;
+
+       if (!channel) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
+               goto end;
+       }
+
+       socket = channel->socket;
+       if (!lttng_condition_validate(condition)) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
+               goto end;
+       }
+
+       ret = lttng_condition_serialize(condition, NULL);
+       if (ret < 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
+               goto end;
+       }
+       assert(ret < UINT32_MAX);
+       cmd_message.size = (uint32_t) ret;
+       command_size = ret + sizeof(
+                       struct lttng_notification_channel_message);
+       command_buffer = zmalloc(command_size);
+       if (!command_buffer) {
+               goto end;
+       }
+
+       memcpy(command_buffer, &cmd_message, sizeof(cmd_message));
+       ret = lttng_condition_serialize(condition,
+                       command_buffer + sizeof(cmd_message));
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = lttcomm_send_unix_sock(socket, command_buffer, command_size);
+       if (ret < 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end;
+       }
+
+       /* Receive command reply header. */
+       do {
+               ret = lttcomm_recv_unix_sock(socket,
+                               ((char *) &reply_message) + received,
+                               sizeof(reply_message) - received);
+               if (ret <= 0) {
+                       status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+                       goto end;
+               }
+               received += ret;
+       } while (received < sizeof(reply_message));
+       if (reply_message.type !=
+                       LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY ||
+                       reply_message.size != sizeof(reply)) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end;
+       }
+
+       /* Receive command reply payload. */
+       received = 0;
+       do {
+               ret = lttcomm_recv_unix_sock(socket,
+                               ((char *) &reply) + received,
+                               sizeof(reply) - received);
+               if (ret <= 0) {
+                       status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+                       goto end;
+               }
+               received += ret;
+       } while (received < sizeof(reply));
+       status = (enum lttng_notification_channel_status) reply.status;
+end:
+       free(command_buffer);
+       return status;
+}
+
+enum lttng_notification_channel_status lttng_notification_channel_subscribe(
+               struct lttng_notification_channel *channel,
+               struct lttng_condition *condition)
+{
+       return send_command(channel,
+                       LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
+                       condition);
+}
+
+enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
+               struct lttng_notification_channel *channel,
+               struct lttng_condition *condition)
+{
+       return send_command(channel,
+                       LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
+                       condition);
+}
+
+void lttng_notification_channel_destroy(
+               struct lttng_notification_channel *channel)
+{
+       if (!channel) {
+               return;
+       }
+
+       if (channel->socket >= 0) {
+               (void) lttcomm_close_unix_sock(channel->socket);
+       }
+       free(channel);
+}
+
index f0b211c7b4a426e959855637ce23d7c08496143d..e44986ee74e2dcd1101f3687457c6d90edb77609 100644 (file)
@@ -36,6 +36,8 @@
 #include <common/utils.h>
 #include <lttng/lttng.h>
 #include <lttng/health-internal.h>
+#include <lttng/trigger/trigger-internal.h>
+#include <lttng/endpoint.h>
 
 #include "filter/filter-ast.h"
 #include "filter/filter-parser.h"
@@ -2435,6 +2437,94 @@ end:
        return ret;
 }
 
+int lttng_register_trigger(struct lttng_trigger *trigger)
+{
+       int ret;
+       struct lttcomm_session_msg lsm;
+       char *trigger_buf = NULL;
+       ssize_t trigger_size;
+
+       if (!trigger) {
+               ret = -LTTNG_ERR_INVALID;
+               goto end;
+       }
+
+       if (!lttng_trigger_validate(trigger)) {
+               ret = -LTTNG_ERR_INVALID;
+               goto end;
+       }
+
+       trigger_size = lttng_trigger_serialize(trigger, NULL);
+       if (trigger_size < 0) {
+               ret = -LTTNG_ERR_UNK;
+               goto end;
+       }
+
+       trigger_buf = zmalloc(trigger_size);
+       if (!trigger_buf) {
+               ret = -LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       memset(&lsm, 0, sizeof(lsm));
+       lsm.cmd_type = LTTNG_REGISTER_TRIGGER;
+       if (lttng_trigger_serialize(trigger, trigger_buf) < 0) {
+               ret = -LTTNG_ERR_UNK;
+               goto end;
+       }
+
+       lsm.u.trigger.length = (uint32_t) trigger_size;
+       ret = lttng_ctl_ask_sessiond_varlen_no_cmd_header(&lsm, trigger_buf,
+                       trigger_size, NULL);
+end:
+       free(trigger_buf);
+       return ret;
+}
+
+int lttng_unregister_trigger(struct lttng_trigger *trigger)
+{
+       int ret;
+       struct lttcomm_session_msg lsm;
+       char *trigger_buf = NULL;
+       ssize_t trigger_size;
+
+       if (!trigger) {
+               ret = -LTTNG_ERR_INVALID;
+               goto end;
+       }
+
+       if (!lttng_trigger_validate(trigger)) {
+               ret = -LTTNG_ERR_INVALID;
+               goto end;
+       }
+
+       trigger_size = lttng_trigger_serialize(trigger, NULL);
+       if (trigger_size < 0) {
+               ret = -LTTNG_ERR_UNK;
+               goto end;
+       }
+
+       trigger_buf = zmalloc(trigger_size);
+       if (!trigger_buf) {
+               ret = -LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       memset(&lsm, 0, sizeof(lsm));
+       lsm.cmd_type = LTTNG_UNREGISTER_TRIGGER;
+       if (lttng_trigger_serialize(trigger, trigger_buf) < 0) {
+               ret = -LTTNG_ERR_UNK;
+               goto end;
+       }
+
+       lsm.u.trigger.length = (uint32_t) trigger_size;
+       ret = lttng_ctl_ask_sessiond_varlen_no_cmd_header(&lsm, trigger_buf,
+                       trigger_size, NULL);
+end:
+       free(trigger_buf);
+       return ret;
+}
+
 /*
  * lib constructor.
  */
index f9dd53bef40125cc3b2a24469918cd2e4ebab39e..ddae048cab5c47681128adde992c8738afb1cf10 100644 (file)
@@ -73,6 +73,7 @@ UST_DATA_TRACE=$(top_builddir)/src/bin/lttng-sessiond/trace-ust.$(OBJEXT) \
                   $(top_builddir)/src/bin/lttng-sessiond/session.$(OBJEXT) \
                   $(top_builddir)/src/bin/lttng-sessiond/snapshot.$(OBJEXT) \
                   $(top_builddir)/src/bin/lttng-sessiond/agent.$(OBJEXT) \
+                  $(top_builddir)/src/bin/lttng-sessiond/notification-thread-commands.$(OBJEXT) \
                   $(top_builddir)/src/common/libcommon.la \
                   $(top_builddir)/src/common/health/libhealth.la \
                   $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la
index 7996b8e60e7d4e9ce60cea4220e44a357692d6ce..86c1e23bf43edceffa0bce237c40fb0772899a8b 100644 (file)
@@ -30,6 +30,7 @@
 #include <common/defaults.h>
 #include <bin/lttng-sessiond/trace-ust.h>
 #include <bin/lttng-sessiond/ust-app.h>
+#include <bin/lttng-sessiond/notification-thread.h>
 
 #include <tap/tap.h>
 
@@ -49,8 +50,9 @@ int lttng_opt_mi;
 int ust_consumerd32_fd;
 int ust_consumerd64_fd;
 
-/* Global variable required by sessiond objects being linked-in */
+/* Global variables required by sessiond objects being linked-in */
 struct lttng_ht *agent_apps_ht_by_sock;
+struct notification_thread_handle *notification_thread_handle;
 
 static const char alphanum[] =
        "0123456789"
This page took 0.218174 seconds and 5 git commands to generate.