From daa81f582a5374fd38efc416be0c1feeed659dff Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 24 Feb 2017 01:13:17 -0500 Subject: [PATCH] Deliverables 3 and 4 MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Galarneau --- configure.ac | 14 +- include/Makefile.am | 28 +- include/lttng/action/action-internal.h | 55 + include/lttng/action/action.h | 41 + include/lttng/action/notify-internal.h | 28 + include/lttng/action/notify.h | 33 + .../lttng/condition/buffer-usage-internal.h | 102 + include/lttng/condition/buffer-usage.h | 102 + include/lttng/condition/condition-internal.h | 70 + include/lttng/condition/condition.h | 52 + include/lttng/condition/evaluation-internal.h | 53 + include/lttng/condition/evaluation.h | 46 + include/lttng/endpoint-internal.h | 32 + include/lttng/endpoint.h | 32 + include/lttng/lttng-error.h | 4 + include/lttng/notification/channel-internal.h | 51 + include/lttng/notification/channel.h | 67 + .../notification/notification-internal.h | 56 + include/lttng/notification/notification.h | 46 + include/lttng/trigger/trigger-internal.h | 52 + include/lttng/trigger/trigger.h | 54 + src/bin/lttng-consumerd/lttng-consumerd.c | 10 +- src/bin/lttng-sessiond/Makefile.am | 5 +- src/bin/lttng-sessiond/agent-thread.c | 2 +- src/bin/lttng-sessiond/cmd.c | 81 + src/bin/lttng-sessiond/cmd.h | 7 + src/bin/lttng-sessiond/consumer.c | 32 +- src/bin/lttng-sessiond/consumer.h | 8 + src/bin/lttng-sessiond/health-sessiond.h | 1 + src/bin/lttng-sessiond/lttng-sessiond.h | 5 +- src/bin/lttng-sessiond/lttng-ust-ctl.h | 1 + src/bin/lttng-sessiond/main.c | 224 ++- .../notification-thread-commands.c | 168 ++ .../notification-thread-commands.h | 89 + .../notification-thread-events.c | 1663 +++++++++++++++++ .../notification-thread-events.h | 52 + src/bin/lttng-sessiond/notification-thread.c | 682 +++++++ src/bin/lttng-sessiond/notification-thread.h | 73 + src/bin/lttng-sessiond/ust-app.c | 72 +- src/bin/lttng-sessiond/ust-consumer.c | 10 + src/bin/lttng-sessiond/ust-registry.c | 22 +- src/bin/lttng-sessiond/ust-registry.h | 3 +- src/common/Makefile.am | 5 +- src/common/action.c | 119 ++ src/common/buffer-usage.c | 784 ++++++++ src/common/condition.c | 159 ++ src/common/consumer/consumer-timer.c | 342 +++- src/common/consumer/consumer-timer.h | 11 +- src/common/consumer/consumer.c | 5 + src/common/consumer/consumer.h | 11 + src/common/defaults.h | 10 +- src/common/dynamic-buffer.c | 166 ++ src/common/dynamic-buffer.h | 59 + src/common/endpoint.c | 26 + src/common/error.c | 4 + src/common/evaluation.c | 110 ++ src/common/macros.h | 9 + src/common/notification.c | 168 ++ src/common/notify.c | 49 + src/common/pipe.c | 66 + src/common/pipe.h | 6 + src/common/sessiond-comm/sessiond-comm.h | 21 + src/common/trigger.c | 179 ++ src/common/unix.c | 31 +- src/common/ust-consumer/ust-consumer.c | 57 + src/common/utils.c | 67 + src/common/utils.h | 1 + src/lib/lttng-ctl/Makefile.am | 3 +- src/lib/lttng-ctl/channel.c | 285 +++ src/lib/lttng-ctl/lttng-ctl.c | 90 + tests/unit/Makefile.am | 1 + tests/unit/test_ust_data.c | 4 +- 72 files changed, 6955 insertions(+), 121 deletions(-) create mode 100644 include/lttng/action/action-internal.h create mode 100644 include/lttng/action/action.h create mode 100644 include/lttng/action/notify-internal.h create mode 100644 include/lttng/action/notify.h create mode 100644 include/lttng/condition/buffer-usage-internal.h create mode 100644 include/lttng/condition/buffer-usage.h create mode 100644 include/lttng/condition/condition-internal.h create mode 100644 include/lttng/condition/condition.h create mode 100644 include/lttng/condition/evaluation-internal.h create mode 100644 include/lttng/condition/evaluation.h create mode 100644 include/lttng/endpoint-internal.h create mode 100644 include/lttng/endpoint.h create mode 100644 include/lttng/notification/channel-internal.h create mode 100644 include/lttng/notification/channel.h create mode 100644 include/lttng/notification/notification-internal.h create mode 100644 include/lttng/notification/notification.h create mode 100644 include/lttng/trigger/trigger-internal.h create mode 100644 include/lttng/trigger/trigger.h create mode 100644 src/bin/lttng-sessiond/notification-thread-commands.c create mode 100644 src/bin/lttng-sessiond/notification-thread-commands.h create mode 100644 src/bin/lttng-sessiond/notification-thread-events.c create mode 100644 src/bin/lttng-sessiond/notification-thread-events.h create mode 100644 src/bin/lttng-sessiond/notification-thread.c create mode 100644 src/bin/lttng-sessiond/notification-thread.h create mode 100644 src/common/action.c create mode 100644 src/common/buffer-usage.c create mode 100644 src/common/condition.c create mode 100644 src/common/dynamic-buffer.c create mode 100644 src/common/dynamic-buffer.h create mode 100644 src/common/endpoint.c create mode 100644 src/common/evaluation.c create mode 100644 src/common/notification.c create mode 100644 src/common/notify.c create mode 100644 src/common/trigger.c create mode 100644 src/lib/lttng-ctl/channel.c diff --git a/configure.ac b/configure.ac index 0a7034e24..9bc394123 100644 --- a/configure.ac +++ b/configure.ac @@ -948,8 +948,20 @@ CFLAGS="-Wall $CFLAGS -g -fno-strict-aliasing" DEFAULT_INCLUDES="-I\$(top_srcdir) -I\$(top_builddir) -I\$(top_builddir)/src -I\$(top_builddir)/include -include config.h" lttngincludedir="${includedir}/lttng" - AC_SUBST(lttngincludedir) + +lttngactionincludedir="${includedir}/lttng/action" +AC_SUBST(lttngactionincludedir) + +lttngconditionincludedir="${includedir}/lttng/condition" +AC_SUBST(lttngconditionincludedir) + +lttngnotificationincludedir="${includedir}/lttng/notification" +AC_SUBST(lttngnotificationincludedir) + +lttngtriggerincludedir="${includedir}/lttng/trigger" +AC_SUBST(lttngtriggerincludedir) + AC_SUBST(DEFAULT_INCLUDES) lttnglibexecdir="${libdir}/lttng/libexec" diff --git a/include/Makefile.am b/include/Makefile.am index 0536dcdf0..bc105b064 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -77,10 +77,36 @@ lttnginclude_HEADERS = \ lttng/snapshot.h \ lttng/save.h \ lttng/load.h \ + lttng/endpoint.h \ version.h.tmpl +lttngactioninclude_HEADERS= \ + lttng/action/action.h \ + lttng/action/notify.h + +lttngconditioninclude_HEADERS= \ + lttng/condition/condition.h \ + lttng/condition/buffer-usage.h \ + lttng/condition/evaluation.h + +lttngnotificationinclude_HEADERS= \ + lttng/notification/channel.h \ + lttng/notification/notification.h + +lttngtriggerinclude_HEADERS= \ + lttng/trigger/trigger.h + noinst_HEADERS = \ lttng/snapshot-internal.h \ lttng/health-internal.h \ lttng/save-internal.h \ - lttng/load-internal.h + lttng/load-internal.h \ + lttng/action/action-internal.h \ + lttng/action/notify-internal.h \ + lttng/condition/condition-internal.h \ + lttng/condition/buffer-usage-internal.h \ + lttng/condition/evaluation-internal.h \ + lttng/notification/notification-internal.h \ + lttng/trigger/trigger-internal.h \ + lttng/endpoint-internal.h \ + lttng/notification/channel-internal.h diff --git a/include/lttng/action/action-internal.h b/include/lttng/action/action-internal.h new file mode 100644 index 000000000..9210f9fa3 --- /dev/null +++ b/include/lttng/action/action-internal.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ACTION_INTERNAL_H +#define LTTNG_ACTION_INTERNAL_H + +#include +#include +#include + +typedef bool (*action_validate_cb)(struct lttng_action *action); +typedef void (*action_destroy_cb)(struct lttng_action *action); +typedef ssize_t (*action_serialize_cb)(struct lttng_action *action, char *buf); + +struct lttng_action { + enum lttng_action_type type; + action_validate_cb validate; + action_serialize_cb serialize; + action_destroy_cb destroy; +}; + +struct lttng_action_comm { + /* enum lttng_action_type */ + int8_t action_type; +} LTTNG_PACKED; + +LTTNG_HIDDEN +bool lttng_action_validate(struct lttng_action *action); + +LTTNG_HIDDEN +ssize_t lttng_action_serialize(struct lttng_action *action, char *buf); + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_action_create_from_buffer(const char *buf, + struct lttng_action **action); + +#endif /* LTTNG_ACTION_INTERNAL_H */ diff --git a/include/lttng/action/action.h b/include/lttng/action/action.h new file mode 100644 index 000000000..311f5a0fb --- /dev/null +++ b/include/lttng/action/action.h @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ACTION_H +#define LTTNG_ACTION_H + +struct lttng_action; + +#ifdef __cplusplus +extern "C" { +#endif + +enum lttng_action_type { + LTTNG_ACTION_TYPE_UNKNOWN = -1, + LTTNG_ACTION_TYPE_NOTIFY = 0, +}; + +extern enum lttng_action_type lttng_action_get_type( + struct lttng_action *action); + +extern void lttng_action_destroy(struct lttng_action *action); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_ACTION_H */ diff --git a/include/lttng/action/notify-internal.h b/include/lttng/action/notify-internal.h new file mode 100644 index 000000000..509bd36a9 --- /dev/null +++ b/include/lttng/action/notify-internal.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ACTION_NOTIFY_INTERNAL_H +#define LTTNG_ACTION_NOTIFY_INTERNAL_H + +#include +#include + +struct lttng_action_notify { + struct lttng_action parent; +}; + +#endif /* LTTNG_ACTION_NOTIFY_INTERNAL_H */ diff --git a/include/lttng/action/notify.h b/include/lttng/action/notify.h new file mode 100644 index 000000000..6c0d0ca25 --- /dev/null +++ b/include/lttng/action/notify.h @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ACTION_NOTIFY_H +#define LTTNG_ACTION_NOTIFY_H + +struct lttng_action; + +#ifdef __cplusplus +extern "C" { +#endif + +extern struct lttng_action *lttng_action_notify_create(void); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_ACTION_NOTIFY_H */ diff --git a/include/lttng/condition/buffer-usage-internal.h b/include/lttng/condition/buffer-usage-internal.h new file mode 100644 index 000000000..1052c489f --- /dev/null +++ b/include/lttng/condition/buffer-usage-internal.h @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H +#define LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H + +#include +#include +#include +#include + +struct lttng_condition_buffer_usage { + struct lttng_condition parent; + struct { + bool set; + uint64_t value; + } threshold_bytes; + struct { + bool set; + double value; + } threshold_ratio; + char *session_name; + char *channel_name; + struct { + bool set; + enum lttng_domain_type type; + } domain; +}; + +struct lttng_condition_buffer_usage_comm { + uint8_t threshold_set_in_bytes; + /* + * Expressed in bytes if "threshold_set_in_bytes" is not 0. + * Otherwise, it is expressed a ratio in the interval [0.0, 1.0] + * that is mapped to the range on a 32-bit unsigned integer. + * The ratio is obtained by (threshold / UINT32_MAX). + */ + uint32_t threshold; + /* Both lengths include the trailing \0. */ + uint32_t session_name_len; + uint32_t channel_name_len; + /* enum lttng_domain_type */ + int8_t domain_type; + /* session and channel names. */ + char names[]; +} LTTNG_PACKED; + +struct lttng_evaluation_buffer_usage { + struct lttng_evaluation parent; + uint64_t buffer_use; + uint64_t buffer_capacity; +}; + +struct lttng_evaluation_buffer_usage_comm { + uint64_t buffer_use; + uint64_t buffer_capacity; +} LTTNG_PACKED; + +LTTNG_HIDDEN +struct lttng_evaluation *lttng_evaluation_buffer_usage_create( + enum lttng_condition_type type, uint64_t use, + uint64_t capacity); + +/* + * Applies to all below: + * + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_condition_buffer_usage_low_create_from_buffer(const char *buf, + struct lttng_condition **condition); + +LTTNG_HIDDEN +ssize_t lttng_condition_buffer_usage_high_create_from_buffer(const char *buf, + struct lttng_condition **condition); + +LTTNG_HIDDEN +ssize_t lttng_evaluation_buffer_usage_low_create_from_buffer(const char *buf, + struct lttng_evaluation **evaluation); + +LTTNG_HIDDEN +ssize_t lttng_evaluation_buffer_usage_high_create_from_buffer(const char *buf, + struct lttng_evaluation **evaluation); + + + +#endif /* LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H */ diff --git a/include/lttng/condition/buffer-usage.h b/include/lttng/condition/buffer-usage.h new file mode 100644 index 000000000..3f5e4d870 --- /dev/null +++ b/include/lttng/condition/buffer-usage.h @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_BUFFER_USAGE_H +#define LTTNG_CONDITION_BUFFER_USAGE_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_condition; +struct lttng_evaluation; + +extern struct lttng_condition * +lttng_condition_buffer_usage_low_create(void); + +extern struct lttng_condition * +lttng_condition_buffer_usage_high_create(void); + +/* threshold_ratio expressed as [0.0, 1.0]. */ +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_threshold_ratio( + struct lttng_condition *condition, + double *threshold_ratio); + +/* threshold_ratio expressed as [0.0, 1.0]. */ +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_threshold_ratio( + struct lttng_condition *condition, + double threshold_ratio); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_threshold( + struct lttng_condition *condition, + uint64_t *threshold_bytes); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_threshold( + struct lttng_condition *condition, + uint64_t threshold_bytes); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_session_name( + struct lttng_condition *condition, + const char **session_name); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_session_name( + struct lttng_condition *condition, + const char *session_name); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_channel_name( + struct lttng_condition *condition, + const char **channel_name); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_channel_name( + struct lttng_condition *condition, + const char *channel_name); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_domain_type( + struct lttng_condition *condition, + enum lttng_domain_type *type); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_domain_type( + struct lttng_condition *condition, + enum lttng_domain_type type); + + +/* LTTng Condition Evaluation */ +extern enum lttng_evaluation_status +lttng_evaluation_buffer_usage_get_usage_ratio( + struct lttng_evaluation *evaluation, + double *usage_ratio); + +extern enum lttng_evaluation_status +lttng_evaluation_buffer_usage_get_usage( + struct lttng_evaluation *evaluation, + uint64_t *usage_bytes); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_CONDITION_BUFFER_USAGE_H */ diff --git a/include/lttng/condition/condition-internal.h b/include/lttng/condition/condition-internal.h new file mode 100644 index 000000000..66dbf20f7 --- /dev/null +++ b/include/lttng/condition/condition-internal.h @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_INTERNAL_H +#define LTTNG_CONDITION_INTERNAL_H + +#include +#include +#include +#include +#include + +typedef void (*condition_destroy_cb)(struct lttng_condition *condition); +typedef bool (*condition_validate_cb)(struct lttng_condition *condition); +typedef ssize_t (*condition_serialize_cb)(struct lttng_condition *condition, + char *buf); +typedef bool (*condition_equal_cb)(struct lttng_condition *a, + struct lttng_condition *b); + +struct lttng_condition { + enum lttng_condition_type type; + condition_validate_cb validate; + condition_serialize_cb serialize; + condition_equal_cb equal; + condition_destroy_cb destroy; +}; + +struct lttng_condition_comm { + /* enum lttng_condition_type */ + int8_t condition_type; + char payload[]; +}; + +LTTNG_HIDDEN +void lttng_condition_init(struct lttng_condition *condition, + enum lttng_condition_type type); + +LTTNG_HIDDEN +bool lttng_condition_validate(struct lttng_condition *condition); + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_condition_create_from_buffer(const char *buf, + struct lttng_condition **condition); + +LTTNG_HIDDEN +ssize_t lttng_condition_serialize(struct lttng_condition *condition, char *buf); + +LTTNG_HIDDEN +bool lttng_condition_is_equal(struct lttng_condition *a, + struct lttng_condition *b); + +#endif /* LTTNG_CONDITION_INTERNAL_H */ diff --git a/include/lttng/condition/condition.h b/include/lttng/condition/condition.h new file mode 100644 index 000000000..177b4ab83 --- /dev/null +++ b/include/lttng/condition/condition.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_H +#define LTTNG_CONDITION_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_condition; + +enum lttng_condition_type { + LTTNG_CONDITION_TYPE_UNKNOWN = -1, + LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW = 102, + LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH = 101, +}; + +enum lttng_condition_status { + LTTNG_CONDITION_STATUS_OK = 0, + LTTNG_CONDITION_STATUS_ERROR = -1, + LTTNG_CONDITION_STATUS_UNKNOWN = -2, + LTTNG_CONDITION_STATUS_INVALID = -3, + LTTNG_CONDITION_STATUS_UNSET = -4, +}; + +extern enum lttng_condition_type lttng_condition_get_type( + struct lttng_condition *condition); + +extern void lttng_condition_destroy(struct lttng_condition *condition); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_CONDITION_H */ diff --git a/include/lttng/condition/evaluation-internal.h b/include/lttng/condition/evaluation-internal.h new file mode 100644 index 000000000..5c874b3e0 --- /dev/null +++ b/include/lttng/condition/evaluation-internal.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_EVALUATION_INTERNAL_H +#define LTTNG_EVALUATION_INTERNAL_H + +#include +#include +#include + +typedef void (*evaluation_destroy_cb)(struct lttng_evaluation *evaluation); +typedef ssize_t (*evaluation_serialize_cb)(struct lttng_evaluation *evaluation, + char *buf); + +struct lttng_evaluation_comm { + /* enum lttng_condition_type type */ + int8_t type; + char payload[]; +} LTTNG_PACKED; + +struct lttng_evaluation { + enum lttng_condition_type type; + evaluation_serialize_cb serialize; + evaluation_destroy_cb destroy; +}; + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_evaluation_create_from_buffer(const char *buf, + struct lttng_evaluation **evaluation); + +LTTNG_HIDDEN +ssize_t lttng_evaluation_serialize(struct lttng_evaluation *evaluation, + char *buf); + +#endif /* LTTNG_EVALUATION_INTERNAL_H */ diff --git a/include/lttng/condition/evaluation.h b/include/lttng/condition/evaluation.h new file mode 100644 index 000000000..841284cd0 --- /dev/null +++ b/include/lttng/condition/evaluation.h @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_EVALUATION_H +#define LTTNG_EVALUATION_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_evaluation; + +enum lttng_evaluation_status { + LTTNG_EVALUATION_STATUS_OK = 0, + LTTNG_EVALUATION_STATUS_ERROR = -1, + LTTNG_EVALUATION_STATUS_INVALID = -2, + LTTNG_EVALUATION_STATUS_UNKNOWN = -2, + LTTNG_EVALUATION_STATUS_UNSET = -3, +}; + +extern enum lttng_condition_type lttng_evaluation_get_type( + struct lttng_evaluation *evaluation); + +extern void lttng_evaluation_destroy(struct lttng_evaluation *evaluation); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_EVALUATION_H */ diff --git a/include/lttng/endpoint-internal.h b/include/lttng/endpoint-internal.h new file mode 100644 index 000000000..de6024b42 --- /dev/null +++ b/include/lttng/endpoint-internal.h @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ENDPOINT_INTERNAL_H +#define LTTNG_ENDPOINT_INTERNAL_H + +#include +#include + +enum lttng_endpoint_type { + LTTNG_ENDPOINT_TYPE_DEFAULT_SESSIOND_NOTIFICATION = 0, +}; + +struct lttng_endpoint { + enum lttng_endpoint_type type; +}; + +#endif /* LTTNG_ENDPOINT_INTERNAL_H */ diff --git a/include/lttng/endpoint.h b/include/lttng/endpoint.h new file mode 100644 index 000000000..b93ed697e --- /dev/null +++ b/include/lttng/endpoint.h @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ENDPOINT_H +#define LTTNG_ENDPOINT_H + +#ifdef __cplusplus +extern "C" { +#endif + +/* Default LTTng session daemon endpoint singleton. */ +extern struct lttng_endpoint *lttng_session_daemon_notification_endpoint; + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_ENDPOINT_H */ diff --git a/include/lttng/lttng-error.h b/include/lttng/lttng-error.h index db6fe73c2..1b5ea699a 100644 --- a/include/lttng/lttng-error.h +++ b/include/lttng/lttng-error.h @@ -145,6 +145,10 @@ enum lttng_error_code { LTTNG_ERR_REGEN_STATEDUMP_FAIL = 122, /* Failed to regenerate the state dump */ LTTNG_ERR_REGEN_STATEDUMP_NOMEM = 123, /* Failed to regenerate the state dump, not enough memory */ LTTNG_ERR_NOT_SNAPSHOT_SESSION = 124, /* Session is not in snapshot mode. */ + LTTNG_ERR_INVALID_TRIGGER = 125, /* Invalid trigger provided. */ + LTTNG_ERR_TRIGGER_EXISTS = 126, /* Trigger already registered. */ + LTTNG_ERR_TRIGGER_NOT_FOUND = 127, /* Trigger not found. */ + LTTNG_ERR_COMMAND_CANCELLED = 128, /* Command cancelled. */ /* MUST be last element */ LTTNG_ERR_NR, /* Last element */ diff --git a/include/lttng/notification/channel-internal.h b/include/lttng/notification/channel-internal.h new file mode 100644 index 000000000..e684a9633 --- /dev/null +++ b/include/lttng/notification/channel-internal.h @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H +#define LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H + +#include +#include +#include +#include + +enum lttng_notification_channel_message_type { + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE = 0, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE = 1, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY = 2, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION = 3, +}; + +struct lttng_notification_channel_message { + /* enum lttng_notification_channel_message_type */ + int8_t type; + /* size of the payload following this field */ + uint32_t size; + char payload[]; +} LTTNG_PACKED; + +struct lttng_notification_channel_command_reply { + /* enum lttng_notification_channel_status */ + int8_t status; +} LTTNG_PACKED; + +struct lttng_notification_channel { + /* FIXME Add mutex to protect the socket from concurrent uses. */ + int socket; +}; + +#endif /* LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H */ diff --git a/include/lttng/notification/channel.h b/include/lttng/notification/channel.h new file mode 100644 index 000000000..36b3509c7 --- /dev/null +++ b/include/lttng/notification/channel.h @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_NOTIFICATION_CHANNEL_H +#define LTTNG_NOTIFICATION_CHANNEL_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_endpoint; +struct lttng_condition; +struct lttng_notification; +struct lttng_notification_channel; + +/* LTTng Notification channel */ +enum lttng_notification_channel_status { + LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED = 1, + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK = 0, + LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR = -1, + LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED = -2, + LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED = -3, + /* Condition unknown. */ + LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION = -4, + LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID = -5, +}; + +extern struct lttng_notification_channel *lttng_notification_channel_create( + struct lttng_endpoint *endpoint); + +extern enum lttng_notification_channel_status +lttng_notification_channel_get_next_notification( + struct lttng_notification_channel *channel, + struct lttng_notification **notification); + +extern enum lttng_notification_channel_status +lttng_notification_channel_subscribe( + struct lttng_notification_channel *channel, + struct lttng_condition *condition); + +extern enum lttng_notification_channel_status +lttng_notification_channel_unsubscribe( + struct lttng_notification_channel *channel, + struct lttng_condition *condition); + +extern void lttng_notification_channel_destroy( + struct lttng_notification_channel *channel); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_NOTIFICATION_CHANNEL_H */ diff --git a/include/lttng/notification/notification-internal.h b/include/lttng/notification/notification-internal.h new file mode 100644 index 000000000..d59e2e6cf --- /dev/null +++ b/include/lttng/notification/notification-internal.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_NOTIFICATION_INTERNAL_H +#define LTTNG_NOTIFICATION_INTERNAL_H + +#include +#include +#include +#include + +struct lttng_notification { + struct lttng_condition *condition; + struct lttng_evaluation *evaluation; + bool owns_elements; +}; + +struct lttng_notification_comm { + /* length excludes its own length. */ + uint32_t length; + /* A condition and evaluation object follow. */ + char payload[]; +} LTTNG_PACKED; + +LTTNG_HIDDEN +struct lttng_notification *lttng_notification_create( + struct lttng_condition *condition, + struct lttng_evaluation *evaluation); + +LTTNG_HIDDEN +ssize_t lttng_notification_serialize(struct lttng_notification *notification, + char *buf); + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_notification_create_from_buffer(const char *buf, + struct lttng_notification **notification); + +#endif /* LTTNG_NOTIFICATION_INTERNAL_H */ diff --git a/include/lttng/notification/notification.h b/include/lttng/notification/notification.h new file mode 100644 index 000000000..100d0def4 --- /dev/null +++ b/include/lttng/notification/notification.h @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_NOTIFICATION_H +#define LTTNG_NOTIFICATION_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_condition; +struct lttng_evaluation; +struct lttng_notification; + +/* + * The notification retains ownership of both the condition and evaluation. + * Destroying the notification will also destroy the notification and evaluation + * objects. + */ +extern struct lttng_condition *lttng_notification_get_condition( + struct lttng_notification *notification); + +extern struct lttng_evaluation *lttng_notification_get_evaluation( + struct lttng_notification *notification); + +extern void lttng_notification_destroy(struct lttng_notification *notification); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_NOTIFICATION_H */ diff --git a/include/lttng/trigger/trigger-internal.h b/include/lttng/trigger/trigger-internal.h new file mode 100644 index 000000000..f84a334fe --- /dev/null +++ b/include/lttng/trigger/trigger-internal.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_TRIGGER_INTERNAL_H +#define LTTNG_TRIGGER_INTERNAL_H + +#include +#include +#include +#include + +struct lttng_trigger { + struct lttng_condition *condition; + struct lttng_action *action; +}; + +struct lttng_trigger_comm { + /* length excludes its own length. */ + uint32_t length; + /* A condition and action object follow. */ + char payload[]; +} LTTNG_PACKED; + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_trigger_create_from_buffer(const char *buf, + struct lttng_trigger **trigger); + +LTTNG_HIDDEN +ssize_t lttng_trigger_serialize(struct lttng_trigger *trigger, char *buf); + +LTTNG_HIDDEN +bool lttng_trigger_validate(struct lttng_trigger *trigger); + +#endif /* LTTNG_TRIGGER_INTERNAL_H */ diff --git a/include/lttng/trigger/trigger.h b/include/lttng/trigger/trigger.h new file mode 100644 index 000000000..3d7dd458f --- /dev/null +++ b/include/lttng/trigger/trigger.h @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_TRIGGER_H +#define LTTNG_TRIGGER_H + +struct lttng_action; +struct lttng_condition; +struct lttng_trigger; + +#ifdef __cplusplus +extern "C" { +#endif + +enum lttng_register_trigger_status { + LTTNG_REGISTER_TRIGGER_STATUS_OK = 0, + LTTNG_REGISTER_TRIGGER_STATUS_INVALID = -1, +}; + +/* Trigger assumes ownership of both condition and action after call. */ +extern struct lttng_trigger *lttng_trigger_create( + struct lttng_condition *condition, struct lttng_action *action); + +extern struct lttng_condition *lttng_trigger_get_condition( + struct lttng_trigger *trigger); + +extern struct lttng_action *lttng_trigger_get_action( + struct lttng_trigger *trigger); + +extern void lttng_trigger_destroy(struct lttng_trigger *trigger); + +extern int lttng_register_trigger(struct lttng_trigger *trigger); + +extern int lttng_unregister_trigger(struct lttng_trigger *trigger); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_TRIGGER_H */ diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index 2f8eed108..9fb474753 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -538,7 +538,7 @@ int main(int argc, char **argv) goto exit_data_thread; } - /* Create the thread to manage the receive of fd */ + /* Create the thread to manage the reception of fds */ ret = pthread_create(&sessiond_thread, default_pthread_attr(), consumer_thread_sessiond_poll, (void *) ctx); @@ -583,6 +583,14 @@ exit_metadata_timer_thread: PERROR("pthread_join sessiond_thread"); retval = -1; } + + ret = consumer_timer_thread_get_channel_monitor_pipe(); + if (ret >= 0) { + ret = close(ret); + if (ret) { + PERROR("close channel monitor pipe"); + } + } exit_sessiond_thread: ret = pthread_join(data_thread, &status); diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 423866139..0ac7506b4 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -30,7 +30,10 @@ lttng_sessiond_SOURCES = utils.c utils.h \ agent.c agent.h \ save.h save.c \ load-session-thread.h load-session-thread.c \ - syscall.h syscall.c + syscall.h syscall.c \ + notification-thread.h notification-thread.c \ + notification-thread-commands.h notification-thread-commands.c \ + notification-thread-events.h notification-thread-events.c if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \ diff --git a/src/bin/lttng-sessiond/agent-thread.c b/src/bin/lttng-sessiond/agent-thread.c index 39d2ec4a0..2439b3e28 100644 --- a/src/bin/lttng-sessiond/agent-thread.c +++ b/src/bin/lttng-sessiond/agent-thread.c @@ -252,7 +252,7 @@ void *agent_thread_manage_registration(void *data) goto error_tcp_socket; } - /* Add create valid TCP socket to poll set. */ + /* Add TCP socket to poll set. */ ret = lttng_poll_add(&events, reg_sock->fd, LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index cc81906b6..5c9e69ce1 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -29,6 +29,7 @@ #include #include #include +#include #include "channel.h" #include "consumer.h" @@ -41,6 +42,8 @@ #include "syscall.h" #include "agent.h" #include "buffer-registry.h" +#include "notification-thread.h" +#include "notification-thread-commands.h" #include "cmd.h" @@ -3566,6 +3569,84 @@ end: return ret; } +int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_handle *notification_thread) +{ + int ret; + size_t trigger_len; + ssize_t sock_recv_len; + char *trigger_buffer = NULL; + struct lttng_trigger *trigger = NULL; + + trigger_len = (size_t) cmd_ctx->lsm->u.trigger.length; + trigger_buffer = zmalloc(trigger_len); + if (!trigger_buffer) { + ret = LTTNG_ERR_NOMEM; + goto end; + } + + sock_recv_len = lttcomm_recv_unix_sock(sock, trigger_buffer, + trigger_len); + if (sock_recv_len < 0 || sock_recv_len != trigger_len) { + ERR("Failed to receive \"register trigger\" command payload"); + /* TODO: should this be a new error enum ? */ + ret = LTTNG_ERR_INVALID_TRIGGER; + goto end; + } + + if (lttng_trigger_create_from_buffer(trigger_buffer, &trigger) != + trigger_len) { + ERR("Invalid trigger payload received in \"register trigger\" command"); + ret = LTTNG_ERR_INVALID_TRIGGER; + goto end; + } + + ret = notification_thread_command_register_trigger(notification_thread, + trigger); +end: + free(trigger_buffer); + return ret; +} + +int cmd_unregister_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_handle *notification_thread) +{ + int ret; + size_t trigger_len; + ssize_t sock_recv_len; + char *trigger_buffer = NULL; + struct lttng_trigger *trigger = NULL; + + trigger_len = (size_t) cmd_ctx->lsm->u.trigger.length; + trigger_buffer = zmalloc(trigger_len); + if (!trigger_buffer) { + ret = LTTNG_ERR_NOMEM; + goto end; + } + + sock_recv_len = lttcomm_recv_unix_sock(sock, trigger_buffer, + trigger_len); + if (sock_recv_len < 0 || sock_recv_len != trigger_len) { + ERR("Failed to receive \"register trigger\" command payload"); + /* TODO: should this be a new error enum ? */ + ret = LTTNG_ERR_INVALID_TRIGGER; + goto end; + } + + if (lttng_trigger_create_from_buffer(trigger_buffer, &trigger) != + trigger_len) { + ERR("Invalid trigger payload received in \"register trigger\" command"); + ret = LTTNG_ERR_INVALID_TRIGGER; + goto end; + } + + ret = notification_thread_command_unregister_trigger(notification_thread, + trigger); +end: + free(trigger_buffer); + return ret; +} + /* * Send relayd sockets from snapshot output to consumer. Ignore request if the * snapshot output is *not* set with a remote destination. diff --git a/src/bin/lttng-sessiond/cmd.h b/src/bin/lttng-sessiond/cmd.h index ac88d5130..e7e344276 100644 --- a/src/bin/lttng-sessiond/cmd.h +++ b/src/bin/lttng-sessiond/cmd.h @@ -21,6 +21,8 @@ #include "context.h" #include "session.h" +struct notification_thread_handle; + /* * Init the command subsystem. Must be called before using any of the functions * above. This is called in the main() of the session daemon. @@ -111,4 +113,9 @@ int cmd_set_session_shm_path(struct ltt_session *session, int cmd_regenerate_metadata(struct ltt_session *session); int cmd_regenerate_statedump(struct ltt_session *session); +int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_handle *notification_thread_handle); +int cmd_unregister_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_handle *notification_thread_handle); + #endif /* CMD_H */ diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 6ee397579..8c8116d8e 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -748,7 +748,6 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) } ret = consumer_recv_status_reply(sock); - error: return ret; } @@ -806,6 +805,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + unsigned int monitor_timer_interval, int output, int type, uint64_t session_id, @@ -837,6 +837,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; msg->u.ask_channel.live_timer_interval = live_timer_interval; + msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval; msg->u.ask_channel.output = output; msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; @@ -1048,6 +1049,35 @@ error: return ret; } +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe) +{ + int ret; + struct lttcomm_consumer_msg msg; + + /* Code flow error. Safety net. */ + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE; + + DBG3("Sending set_channel_monitor_pipe command to consumer"); + ret = consumer_send_msg(consumer_sock, &msg); + if (ret < 0) { + goto error; + } + + DBG3("Sending channel monitoring pipe %d to consumer on socket %d", + pipe, *consumer_sock->fd_ptr); + ret = consumer_send_fds(consumer_sock, &pipe, 1); + if (ret < 0) { + goto error; + } + + DBG2("Channel monitoring pipe successfully sent"); +error: + return ret; +} + /* * Set consumer subdirectory using the session name and a generated datetime if * needed. This is appended to the current subdirectory. diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 08b57eb73..0cc3d0e41 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -93,6 +93,11 @@ struct consumer_data { int err_sock; /* These two sockets uses the cmd_unix_sock_path. */ int cmd_sock; + /* + * Write-end of the channel monitoring pipe to be passed to the + * consumer. + */ + int channel_monitor_pipe; /* * The metadata socket object is handled differently and only created * locally in this object thus it's the only reference available in the @@ -214,6 +219,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, enum lttng_stream_type type, uint64_t session_id, char *session_name, char *hostname, int session_live_timer); +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe); int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer); int consumer_recv_status_reply(struct consumer_socket *sock); @@ -232,6 +239,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + unsigned int monitor_timer_interval, int output, int type, uint64_t session_id, diff --git a/src/bin/lttng-sessiond/health-sessiond.h b/src/bin/lttng-sessiond/health-sessiond.h index 22ea1bb3e..5d94cc639 100644 --- a/src/bin/lttng-sessiond/health-sessiond.h +++ b/src/bin/lttng-sessiond/health-sessiond.h @@ -29,6 +29,7 @@ enum health_type_sessiond { HEALTH_SESSIOND_TYPE_HT_CLEANUP = 5, HEALTH_SESSIOND_TYPE_APP_MANAGE_NOTIFY = 6, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH = 7, + HEALTH_SESSIOND_TYPE_NOTIFICATION = 8, NR_HEALTH_SESSIOND_TYPES, }; diff --git a/src/bin/lttng-sessiond/lttng-sessiond.h b/src/bin/lttng-sessiond/lttng-sessiond.h index 65ab37d59..1988d1fcc 100644 --- a/src/bin/lttng-sessiond/lttng-sessiond.h +++ b/src/bin/lttng-sessiond/lttng-sessiond.h @@ -29,6 +29,7 @@ #include "session.h" #include "ust-app.h" #include "version.h" +#include "notification-thread.h" extern const char default_home_dir[], default_tracing_group[], @@ -38,6 +39,8 @@ extern const char default_home_dir[], /* Set in main.c at boot time of the daemon */ extern int kernel_tracer_fd; +extern struct notification_thread_handle *notification_thread_handle; + /* * This contains extra data needed for processing a command received by the * session daemon from the lttng client. @@ -58,7 +61,7 @@ struct ust_command { }; /* - * Queue used to enqueue UST registration request (ust_commant) and protected + * Queue used to enqueue UST registration request (ust_command) and synchronized * by a futex with a scheme N wakers / 1 waiters. See futex.c/.h */ struct ust_cmd_queue { diff --git a/src/bin/lttng-sessiond/lttng-ust-ctl.h b/src/bin/lttng-sessiond/lttng-ust-ctl.h index a812ee857..cba0e272d 100644 --- a/src/bin/lttng-sessiond/lttng-ust-ctl.h +++ b/src/bin/lttng-sessiond/lttng-ust-ctl.h @@ -215,6 +215,7 @@ int ustctl_put_next_subbuf(struct ustctl_consumer_stream *stream); /* snapshot */ int ustctl_snapshot(struct ustctl_consumer_stream *stream); +int ustctl_snapshot_sample_positions(struct ustctl_consumer_stream *stream); int ustctl_snapshot_get_consumed(struct ustctl_consumer_stream *stream, unsigned long *pos); int ustctl_snapshot_get_produced(struct ustctl_consumer_stream *stream, diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 172f8c270..f137c09f8 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -71,6 +71,7 @@ #include "agent-thread.h" #include "save.h" #include "load-session-thread.h" +#include "notification-thread.h" #include "syscall.h" #include "agent.h" #include "ht-cleanup.h" @@ -104,6 +105,7 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -115,6 +117,7 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -126,6 +129,7 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -211,6 +215,7 @@ static pthread_t health_thread; static pthread_t ht_cleanup_thread; static pthread_t agent_reg_thread; static pthread_t load_session_thread; +static pthread_t notification_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -305,15 +310,19 @@ const char * const config_section_name = "sessiond"; /* Load session thread information to operate. */ struct load_session_thread_data *load_info; +/* Notification thread handle. */ +struct notification_thread_handle *notification_thread_handle; + /* Global hash tables */ struct lttng_ht *agent_apps_ht_by_sock = NULL; /* - * Whether sessiond is ready for commands/health check requests. + * Whether sessiond is ready for commands/notification channel/health check + * requests. * NR_LTTNG_SESSIOND_READY must match the number of calls to * sessiond_notify_ready(). */ -#define NR_LTTNG_SESSIOND_READY 3 +#define NR_LTTNG_SESSIOND_READY 4 int lttng_sessiond_ready = NR_LTTNG_SESSIOND_READY; int sessiond_check_thread_quit_pipe(int fd, uint32_t events) @@ -521,6 +530,24 @@ static void close_consumer_sockets(void) PERROR("UST consumerd64 cmd_sock close"); } } + if (kconsumer_data.channel_monitor_pipe >= 0) { + ret = close(kconsumer_data.channel_monitor_pipe); + if (ret < 0) { + PERROR("kernel consumer channel monitor pipe close"); + } + } + if (ustconsumer32_data.channel_monitor_pipe >= 0) { + ret = close(ustconsumer32_data.channel_monitor_pipe); + if (ret < 0) { + PERROR("UST consumerd32 channel monitor pipe close"); + } + } + if (ustconsumer64_data.channel_monitor_pipe >= 0) { + ret = close(ustconsumer64_data.channel_monitor_pipe); + if (ret < 0) { + PERROR("UST consumerd64 channel monitor pipe close"); + } + } } /* @@ -697,6 +724,10 @@ static void sessiond_cleanup(void) free(load_info); } + if (notification_thread_handle) { + notification_thread_handle_destroy(notification_thread_handle); + } + /* * Cleanup lock file by deleting it and finaly closing it which will * release the file system lock. @@ -1224,6 +1255,7 @@ static void *thread_manage_consumer(void *data) enum lttcomm_return_code code; struct lttng_poll_event events; struct consumer_data *consumer_data = data; + struct consumer_socket *cmd_socket_wrapper = NULL; DBG("[thread] Manage consumer started"); @@ -1333,39 +1365,43 @@ restart: } health_code_update(); - if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { - /* Connect both socket, command and metadata. */ - consumer_data->cmd_sock = - lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - consumer_data->metadata_fd = - lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - if (consumer_data->cmd_sock < 0 - || consumer_data->metadata_fd < 0) { - PERROR("consumer connect cmd socket"); - /* On error, signal condition and quit. */ - signal_consumer_condition(consumer_data, -1); - goto error; - } - consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; - /* Create metadata socket lock. */ - consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); - if (consumer_data->metadata_sock.lock == NULL) { - PERROR("zmalloc pthread mutex"); - goto error; - } - pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); - - signal_consumer_condition(consumer_data, 1); - DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); - DBG("Consumer metadata socket ready (fd: %d)", - consumer_data->metadata_fd); - } else { + if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { ERR("consumer error when waiting for SOCK_READY : %s", lttcomm_get_readable_code(-code)); goto error; } - /* Remove the consumerd error sock since we've established a connexion */ + /* Connect both command and metadata sockets. */ + consumer_data->cmd_sock = + lttcomm_connect_unix_sock( + consumer_data->cmd_unix_sock_path); + consumer_data->metadata_fd = + lttcomm_connect_unix_sock( + consumer_data->cmd_unix_sock_path); + if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) { + PERROR("consumer connect cmd socket"); + /* On error, signal condition and quit. */ + signal_consumer_condition(consumer_data, -1); + goto error; + } + + consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; + + /* Create metadata socket lock. */ + consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); + if (consumer_data->metadata_sock.lock == NULL) { + PERROR("zmalloc pthread mutex"); + goto error; + } + pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + + DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); + DBG("Consumer metadata socket ready (fd: %d)", + consumer_data->metadata_fd); + + /* + * Remove the consumerd error sock since we've established a connection. + */ ret = lttng_poll_del(&events, consumer_data->err_sock); if (ret < 0) { goto error; @@ -1386,6 +1422,27 @@ restart: health_code_update(); + /* + * Transfer the write-end of the channel monitoring pipe to the + * by issuing a SET_CHANNEL_MONITOR_PIPE command. + */ + cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock); + if (!cmd_socket_wrapper) { + goto error; + } + + ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper, + consumer_data->channel_monitor_pipe); + if (ret) { + goto error; + } + /* Discard the socket wrapper as it is no longer needed. */ + consumer_destroy_socket(cmd_socket_wrapper); + cmd_socket_wrapper = NULL; + + /* The thread is completely initialized, signal that it is ready. */ + signal_consumer_condition(consumer_data, 1); + /* Infinite blocking call, waiting for transmission */ restart_poll: while (1) { @@ -1529,6 +1586,10 @@ error: free(consumer_data->metadata_sock.lock); } lttng_poll_clean(&events); + + if (cmd_socket_wrapper) { + consumer_destroy_socket(cmd_socket_wrapper); + } error_poll: if (err) { health_error(); @@ -3004,6 +3065,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, case LTTNG_SET_SESSION_SHM_PATH: case LTTNG_REGENERATE_METADATA: case LTTNG_REGENERATE_STATEDUMP: + case LTTNG_REGISTER_TRIGGER: + case LTTNG_UNREGISTER_TRIGGER: need_domain = 0; break; default: @@ -3066,6 +3129,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, case LTTNG_LIST_SYSCALLS: case LTTNG_LIST_TRACEPOINT_FIELDS: case LTTNG_SAVE_SESSION: + case LTTNG_REGISTER_TRIGGER: + case LTTNG_UNREGISTER_TRIGGER: need_tracing_session = 0; break; default: @@ -4117,6 +4182,18 @@ error_add_context: ret = cmd_regenerate_statedump(cmd_ctx->session); break; } + case LTTNG_REGISTER_TRIGGER: + { + ret = cmd_register_trigger(cmd_ctx, sock, + notification_thread_handle); + break; + } + case LTTNG_UNREGISTER_TRIGGER: + { + ret = cmd_unregister_trigger(cmd_ctx, sock, + notification_thread_handle); + break; + } default: ret = LTTNG_ERR_UND; break; @@ -5531,6 +5608,9 @@ int main(int argc, char **argv) int ret = 0, retval = 0; void *status; const char *home_path, *env_app_timeout; + struct lttng_pipe *ust32_channel_monitor_pipe = NULL, + *ust64_channel_monitor_pipe = NULL, + *kernel_channel_monitor_pipe = NULL; init_kernel_workarounds(); @@ -5689,6 +5769,19 @@ int main(int argc, char **argv) kconsumer_data.err_unix_sock_path); DBG2("Kernel consumer cmd path: %s", kconsumer_data.cmd_unix_sock_path); + kernel_channel_monitor_pipe = lttng_pipe_open(0); + if (!kernel_channel_monitor_pipe) { + ERR("Failed to create kernel consumer channel monitor pipe"); + retval = -1; + goto exit_init_data; + } + kconsumer_data.channel_monitor_pipe = + lttng_pipe_release_writefd( + kernel_channel_monitor_pipe); + if (kconsumer_data.channel_monitor_pipe < 0) { + retval = -1; + goto exit_init_data; + } } else { home_path = utils_get_home_dir(); if (home_path == NULL) { @@ -5793,6 +5886,18 @@ int main(int argc, char **argv) ustconsumer32_data.err_unix_sock_path); DBG2("UST consumer 32 bits cmd path: %s", ustconsumer32_data.cmd_unix_sock_path); + ust32_channel_monitor_pipe = lttng_pipe_open(0); + if (!ust32_channel_monitor_pipe) { + ERR("Failed to create 32-bit user space consumer channel monitor pipe"); + retval = -1; + goto exit_init_data; + } + ustconsumer32_data.channel_monitor_pipe = lttng_pipe_release_writefd( + ust32_channel_monitor_pipe); + if (ustconsumer32_data.channel_monitor_pipe < 0) { + retval = -1; + goto exit_init_data; + } /* 64 bits consumerd path setup */ ret = snprintf(ustconsumer64_data.err_unix_sock_path, PATH_MAX, @@ -5814,6 +5919,18 @@ int main(int argc, char **argv) ustconsumer64_data.err_unix_sock_path); DBG2("UST consumer 64 bits cmd path: %s", ustconsumer64_data.cmd_unix_sock_path); + ust64_channel_monitor_pipe = lttng_pipe_open(0); + if (!ust64_channel_monitor_pipe) { + ERR("Failed to create 64-bit user space consumer channel monitor pipe"); + retval = -1; + goto exit_init_data; + } + ustconsumer64_data.channel_monitor_pipe = lttng_pipe_release_writefd( + ust64_channel_monitor_pipe); + if (ustconsumer64_data.channel_monitor_pipe < 0) { + retval = -1; + goto exit_init_data; + } /* * See if daemon already exist. @@ -5973,7 +6090,7 @@ int main(int argc, char **argv) } load_info->path = opt_load_session_path; - /* Create health-check thread */ + /* Create health-check thread. */ ret = pthread_create(&health_thread, default_pthread_attr(), thread_manage_health, (void *) NULL); if (ret) { @@ -5983,6 +6100,29 @@ int main(int argc, char **argv) goto exit_health; } + /* notification_thread_data acquires the pipes' read side. */ + notification_thread_handle = notification_thread_handle_create( + ust32_channel_monitor_pipe, + ust64_channel_monitor_pipe, + kernel_channel_monitor_pipe); + if (!notification_thread_handle) { + retval = -1; + ERR("Failed to create notification thread shared data"); + stop_threads(); + goto exit_notification; + } + + /* Create notification thread. */ + ret = pthread_create(¬ification_thread, default_pthread_attr(), + thread_notification, notification_thread_handle); + if (ret) { + errno = ret; + PERROR("pthread_create notification"); + retval = -1; + stop_threads(); + goto exit_notification; + } + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, default_pthread_attr(), thread_manage_clients, (void *) NULL); @@ -5990,6 +6130,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create clients"); retval = -1; + stop_threads(); goto exit_client; } @@ -6000,6 +6141,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create dispatch"); retval = -1; + stop_threads(); goto exit_dispatch; } @@ -6010,6 +6152,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create registration"); retval = -1; + stop_threads(); goto exit_reg_apps; } @@ -6020,6 +6163,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create apps"); retval = -1; + stop_threads(); goto exit_apps; } @@ -6030,6 +6174,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create notify"); retval = -1; + stop_threads(); goto exit_apps_notify; } @@ -6040,6 +6185,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create agent"); retval = -1; + stop_threads(); goto exit_agent_reg; } @@ -6052,6 +6198,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create kernel"); retval = -1; + stop_threads(); goto exit_kernel; } } @@ -6063,6 +6210,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create load_session_thread"); retval = -1; + stop_threads(); goto exit_load_session; } @@ -6139,16 +6287,24 @@ exit_dispatch: PERROR("pthread_join"); retval = -1; } + exit_client: + ret = pthread_join(notification_thread, &status); + if (ret) { + errno = ret; + PERROR("pthread_join notification thread"); + retval = -1; + } +exit_notification: ret = pthread_join(health_thread, &status); if (ret) { errno = ret; PERROR("pthread_join health thread"); retval = -1; } -exit_health: +exit_health: exit_init_data: /* * Wait for all pending call_rcu work to complete before tearing @@ -6176,6 +6332,9 @@ exit_init_data: if (ret) { retval = -1; } + lttng_pipe_destroy(ust32_channel_monitor_pipe); + lttng_pipe_destroy(ust64_channel_monitor_pipe); + lttng_pipe_destroy(kernel_channel_monitor_pipe); exit_ht_cleanup: health_app_destroy(health_sessiond); @@ -6186,7 +6345,6 @@ exit_options: sessiond_cleanup_options(); exit_set_signal_handler: - if (!retval) { exit(EXIT_SUCCESS); } else { diff --git a/src/bin/lttng-sessiond/notification-thread-commands.c b/src/bin/lttng-sessiond/notification-thread-commands.c new file mode 100644 index 000000000..cfbb86210 --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread-commands.c @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include +#include "notification-thread.h" +#include "notification-thread-commands.h" +#include +#include +#include +#include +#include + +static +void init_notification_thread_command(struct notification_thread_command *cmd) +{ + memset(cmd, 0, sizeof(*cmd)); + CDS_INIT_LIST_HEAD(&cmd->cmd_list_node); +} + +static +int run_command_wait(struct notification_thread_handle *handle, + struct notification_thread_command *cmd) +{ + int ret; + uint64_t notification_counter = 1; + + futex_nto1_prepare(&cmd->reply_futex); + + pthread_mutex_lock(&handle->cmd_queue.lock); + /* Add to queue. */ + cds_list_add_tail(&cmd->cmd_list_node, + &handle->cmd_queue.list); + /* Wake-up thread. */ + ret = write(handle->cmd_queue.event_fd, + ¬ification_counter, sizeof(notification_counter)); + if (ret < 0) { + PERROR("write to notification thread's queue event fd"); + /* + * Remove the command from the list so the notification + * thread does not process it. + */ + cds_list_del(&cmd->cmd_list_node); + goto error_unlock_queue; + } + pthread_mutex_unlock(&handle->cmd_queue.lock); + + futex_nto1_wait(&cmd->reply_futex); + return 0; +error_unlock_queue: + pthread_mutex_unlock(&handle->cmd_queue.lock); + return -1; +} + +enum lttng_error_code notification_thread_command_register_trigger( + struct notification_thread_handle *handle, + struct lttng_trigger *trigger) +{ + int ret; + enum lttng_error_code ret_code; + struct notification_thread_command cmd; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER; + cmd.parameters.trigger = trigger; + + ret = run_command_wait(handle, &cmd); + if (ret) { + ret_code = LTTNG_ERR_UNK; + goto end; + } + ret_code = cmd.reply_code; +end: + return ret_code; +} + +enum lttng_error_code notification_thread_command_unregister_trigger( + struct notification_thread_handle *handle, + struct lttng_trigger *trigger) +{ + int ret; + enum lttng_error_code ret_code; + struct notification_thread_command cmd; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER; + cmd.parameters.trigger = trigger; + + ret = run_command_wait(handle, &cmd); + if (ret) { + ret_code = LTTNG_ERR_UNK; + goto end; + } + ret_code = cmd.reply_code; +end: + return ret_code; +} + +enum lttng_error_code notification_thread_command_add_channel( + struct notification_thread_handle *handle, + char *session_name, uid_t uid, gid_t gid, + char *channel_name, uint64_t key, + enum lttng_domain_type domain, uint64_t capacity) +{ + int ret; + enum lttng_error_code ret_code; + struct notification_thread_command cmd; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL; + cmd.parameters.add_channel.session_name = session_name; + cmd.parameters.add_channel.uid = uid; + cmd.parameters.add_channel.gid = gid; + cmd.parameters.add_channel.channel_name = channel_name; + cmd.parameters.add_channel.key.key = key; + cmd.parameters.add_channel.key.domain = domain; + cmd.parameters.add_channel.capacity = capacity; + + ret = run_command_wait(handle, &cmd); + if (ret) { + ret_code = LTTNG_ERR_UNK; + goto end; + } + ret_code = cmd.reply_code; +end: + return ret_code; +} + +enum lttng_error_code notification_thread_command_remove_channel( + struct notification_thread_handle *handle, + uint64_t key, enum lttng_domain_type domain) +{ + int ret; + enum lttng_error_code ret_code; + struct notification_thread_command cmd; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL; + cmd.parameters.remove_channel.key = key; + cmd.parameters.remove_channel.domain = domain; + + ret = run_command_wait(handle, &cmd); + if (ret) { + ret_code = LTTNG_ERR_UNK; + goto end; + } + ret_code = cmd.reply_code; +end: + return ret_code; +} diff --git a/src/bin/lttng-sessiond/notification-thread-commands.h b/src/bin/lttng-sessiond/notification-thread-commands.h new file mode 100644 index 000000000..463aa5e41 --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread-commands.h @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef NOTIFICATION_THREAD_COMMANDS_H +#define NOTIFICATION_THREAD_COMMANDS_H + +#include +#include +#include + +struct notification_thread_data; +struct lttng_trigger; + +enum notification_thread_command_type { + NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER, + NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER, + NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL, + NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL, +}; + +struct channel_key { + uint64_t key; + enum lttng_domain_type domain; +}; + +struct channel_info { + struct channel_key key; + char *session_name; + uid_t uid; + gid_t gid; + char *channel_name; + uint64_t capacity; + struct cds_lfht_node channels_ht_node; +}; + +struct notification_thread_command { + struct cds_list_head cmd_list_node; + + enum notification_thread_command_type type; + union { + /* Register/Unregister trigger. */ + struct lttng_trigger *trigger; + /* Add channel. */ + struct channel_info add_channel; + /* Remove channel. */ + struct { + uint64_t key; + enum lttng_domain_type domain; + } remove_channel; + } parameters; + + /* Futex on which to wait for command reply (optional). */ + int32_t reply_futex; + enum lttng_error_code reply_code; +}; + +enum lttng_error_code notification_thread_command_register_trigger( + struct notification_thread_handle *handle, + struct lttng_trigger *trigger); + +enum lttng_error_code notification_thread_command_unregister_trigger( + struct notification_thread_handle *handle, + struct lttng_trigger *trigger); + +enum lttng_error_code notification_thread_command_add_channel( + struct notification_thread_handle *handle, + char *session_name, uid_t uid, gid_t gid, + char *channel_name, uint64_t key, + enum lttng_domain_type domain, uint64_t capacity); + +enum lttng_error_code notification_thread_command_remove_channel( + struct notification_thread_handle *handle, + uint64_t key, enum lttng_domain_type domain); + +#endif /* NOTIFICATION_THREAD_COMMANDS_H */ diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c new file mode 100644 index 000000000..222e2faa6 --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -0,0 +1,1663 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _LGPL_SOURCE +#include +#include + +#include "notification-thread.h" +#include "notification-thread-events.h" +#include "notification-thread-commands.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct lttng_trigger_list_element { + struct lttng_trigger *trigger; + struct cds_list_head node; +}; + +struct lttng_channel_trigger_list { + struct channel_key channel_key; + struct cds_list_head list; + struct cds_lfht_node channel_triggers_ht_node; +}; + +struct lttng_trigger_ht_element { + struct lttng_trigger *trigger; + struct cds_lfht_node node; +}; + +struct lttng_condition_list_element { + struct lttng_condition *condition; + struct cds_list_head node; +}; + +struct notification_client_list_element { + struct notification_client *client; + struct cds_list_head node; +}; + +struct notification_client_list { + struct lttng_trigger *trigger; + struct cds_list_head list; + struct cds_lfht_node notification_trigger_ht_node; +}; + +struct notification_client { + int socket; + uid_t uid; + gid_t gid; + /* + * Conditions to which the client's notification channel is subscribed. + * List of struct lttng_condition_list_node. The condition member is + * owned by the client. + */ + struct cds_list_head condition_list; + struct cds_lfht_node client_socket_ht_node; +}; + +struct channel_state_sample { + struct channel_key key; + struct cds_lfht_node channel_state_ht_node; + uint64_t highest_usage; + uint64_t lowest_usage; +}; + +static +int match_client(struct cds_lfht_node *node, const void *key) +{ + int socket = (int) key; + struct notification_client *client; + + client = caa_container_of(node, struct notification_client, + client_socket_ht_node); + + return !!(client->socket == socket); +} + +static +int match_channel_trigger_list(struct cds_lfht_node *node, const void *key) +{ + struct channel_key *channel_key = (struct channel_key *) key; + struct lttng_channel_trigger_list *trigger_list; + + trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, + channel_triggers_ht_node); + + return !!((channel_key->key == trigger_list->channel_key.key) && + (channel_key->domain == trigger_list->channel_key.domain)); +} + +static +int match_channel_state_sample(struct cds_lfht_node *node, const void *key) +{ + struct channel_key *channel_key = (struct channel_key *) key; + struct channel_state_sample *sample; + + sample = caa_container_of(node, struct channel_state_sample, + channel_state_ht_node); + + return !!((channel_key->key == sample->key.key) && + (channel_key->domain == sample->key.domain)); +} + +static +int match_channel_info(struct cds_lfht_node *node, const void *key) +{ + struct channel_key *channel_key = (struct channel_key *) key; + struct channel_info *channel_info; + + channel_info = caa_container_of(node, struct channel_info, + channels_ht_node); + + return !!((channel_key->key == channel_info->key.key) && + (channel_key->domain == channel_info->key.domain)); +} + +static +int match_condition(struct cds_lfht_node *node, const void *key) +{ + struct lttng_condition *condition_key = (struct lttng_condition *) key; + struct lttng_trigger_ht_element *trigger; + struct lttng_condition *condition; + + trigger = caa_container_of(node, struct lttng_trigger_ht_element, + node); + condition = lttng_trigger_get_condition(trigger->trigger); + assert(condition); + + return !!lttng_condition_is_equal(condition_key, condition); +} + +static +int match_client_list(struct cds_lfht_node *node, const void *key) +{ + struct lttng_trigger *trigger_key = (struct lttng_trigger *) key; + struct notification_client_list *client_list; + struct lttng_condition *condition; + struct lttng_condition *condition_key = lttng_trigger_get_condition( + trigger_key); + + assert(condition_key); + + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + condition = lttng_trigger_get_condition(client_list->trigger); + + return !!lttng_condition_is_equal(condition_key, condition); +} + +static +int match_client_list_condition(struct cds_lfht_node *node, const void *key) +{ + struct lttng_condition *condition_key = (struct lttng_condition *) key; + struct notification_client_list *client_list; + struct lttng_condition *condition; + + assert(condition_key); + + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + condition = lttng_trigger_get_condition(client_list->trigger); + + return !!lttng_condition_is_equal(condition_key, condition); +} + +static +unsigned long lttng_condition_buffer_usage_hash( + struct lttng_condition *_condition) +{ + unsigned long hash = 0; + struct lttng_condition_buffer_usage *condition; + + condition = container_of(_condition, + struct lttng_condition_buffer_usage, parent); + + if (condition->session_name) { + hash ^= hash_key_str(condition->session_name, lttng_ht_seed); + } + if (condition->channel_name) { + hash ^= hash_key_str(condition->session_name, lttng_ht_seed); + } + if (condition->domain.set) { + hash ^= hash_key_ulong( + (void *) condition->domain.type, + lttng_ht_seed); + } + if (condition->threshold_ratio.set) { + uint64_t val; + + val = condition->threshold_ratio.value * (double) UINT32_MAX; + hash ^= hash_key_u64(&val, lttng_ht_seed); + } else if (condition->threshold_ratio.set) { + uint64_t val; + + val = condition->threshold_bytes.value; + hash ^= hash_key_u64(&val, lttng_ht_seed); + } + return hash; +} + +/* + * The lttng_condition hashing code is kept in this file (rather than + * condition.c) since it makes use of GPLv2 code (hashtable utils), which we + * don't want to link in liblttng-ctl. + */ +static +unsigned long lttng_condition_hash(struct lttng_condition *condition) +{ + switch (condition->type) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + return lttng_condition_buffer_usage_hash(condition); + default: + ERR("[notification-thread] Unexpected condition type caught"); + abort(); + } +} + +static +void channel_info_destroy(struct channel_info *channel_info) +{ + if (!channel_info) { + return; + } + + if (channel_info->session_name) { + free(channel_info->session_name); + } + if (channel_info->channel_name) { + free(channel_info->channel_name); + } + free(channel_info); +} + +static +struct channel_info *channel_info_copy(struct channel_info *channel_info) +{ + struct channel_info *copy = zmalloc(sizeof(*channel_info)); + + assert(channel_info); + assert(channel_info->session_name); + assert(channel_info->channel_name); + + if (!copy) { + goto end; + } + + memcpy(copy, channel_info, sizeof(*channel_info)); + copy->session_name = NULL; + copy->channel_name = NULL; + + copy->session_name = strdup(channel_info->session_name); + if (!copy->session_name) { + goto error; + } + copy->channel_name = strdup(channel_info->channel_name); + if (!copy->channel_name) { + goto error; + } + cds_lfht_node_init(&channel_info->channels_ht_node); +end: + return copy; +error: + channel_info_destroy(copy); + return NULL; +} + +static +int notification_thread_client_subscribe(struct notification_client *client, + struct lttng_condition *condition, + struct notification_thread_state *state, + enum lttng_notification_channel_status *_status) +{ + int ret = 0; + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + struct notification_client_list *client_list; + struct lttng_condition_list_element *condition_list_element = NULL; + struct notification_client_list_element *client_list_element = NULL; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + + /* + * Ensure that the client has not already subscribed to this condition + * before. + */ + cds_list_for_each_entry(condition_list_element, &client->condition_list, node) { + if (lttng_condition_is_equal(condition_list_element->condition, + condition)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED; + goto end; + } + } + + condition_list_element = zmalloc(sizeof(*condition_list_element)); + if (!condition_list_element) { + ret = -1; + goto error; + } + client_list_element = zmalloc(sizeof(*client_list_element)); + if (!client_list_element) { + ret = -1; + goto error; + } + + rcu_read_lock(); + + /* + * Add the newly-subscribed condition to the client's subscription list. + */ + CDS_INIT_LIST_HEAD(&condition_list_element->node); + condition_list_element->condition = condition; + cds_list_add(&condition_list_element->node, &client->condition_list); + + /* + * Add the client to the list of clients interested in a given trigger + * if a "notification" trigger with a corresponding condition was + * added prior. + */ + cds_lfht_lookup(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + match_client_list_condition, + condition, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end_unlock; + } + + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + client_list_element->client = client; + CDS_INIT_LIST_HEAD(&client_list_element->node); + cds_list_add(&client_list->list, &client_list_element->node); +end_unlock: + rcu_read_unlock(); +end: + if (_status) { + *_status = status; + } + return ret; +error: + free(condition_list_element); + free(client_list_element); + return ret; +} + +static +int notification_thread_client_unsubscribe( + struct notification_client *client, + struct lttng_condition *condition, + struct notification_thread_state *state, + enum lttng_notification_channel_status *_status) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + struct notification_client_list *client_list; + struct lttng_condition_list_element *condition_list_element, + *condition_tmp; + struct notification_client_list_element *client_list_element, + *client_tmp; + bool condition_found = false; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + + /* Remove the condition from the client's condition list. */ + cds_list_for_each_entry_safe(condition_list_element, condition_tmp, + &client->condition_list, node) { + if (!lttng_condition_is_equal(condition_list_element->condition, + condition)) { + continue; + } + + cds_list_del(&condition_list_element->node); + /* + * The caller may be iterating on the client's conditions to + * tear down a client's connection. In this case, the condition + * will be destroyed at the end. + */ + if (condition != condition_list_element->condition) { + lttng_condition_destroy( + condition_list_element->condition); + } + free(condition_list_element); + condition_found = true; + break; + } + + if (!condition_found) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION; + goto end; + } + + /* + * Remove the client from the list of clients interested the trigger + * matching the condition. + */ + rcu_read_lock(); + cds_lfht_lookup(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + match_client_list_condition, + condition, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end_unlock; + } + + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + cds_list_for_each_entry_safe(client_list_element, client_tmp, + &client_list->list, node) { + if (client_list_element->client->socket != client->socket) { + continue; + } + cds_list_del(&client_list_element->node); + free(client_list_element); + break; + } +end_unlock: + rcu_read_unlock(); +end: + lttng_condition_destroy(condition); + if (_status) { + *_status = status; + } + return 0; +} + +static +void notification_client_destroy(struct notification_client *client, + struct notification_thread_state *state) +{ + struct lttng_condition_list_element *condition_list_element, *tmp; + + if (!client) { + return; + } + + /* Release all conditions to which the client was subscribed. */ + cds_list_for_each_entry_safe(condition_list_element, tmp, + &client->condition_list, node) { + (void) notification_thread_client_unsubscribe(client, + condition_list_element->condition, state, NULL); + } + + if (client->socket >= 0) { + (void) lttcomm_close_unix_sock(client->socket); + } + free(client); +} + +/* + * Call with rcu_read_lock held (and hold for the lifetime of the returned + * client pointer). + */ +static +struct notification_client *get_client_from_socket(int socket, + struct notification_thread_state *state) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + struct notification_client *client = NULL; + + cds_lfht_lookup(state->client_socket_ht, + hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed), + match_client, + (void *) (unsigned long) socket, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end; + } + + client = caa_container_of(node, struct notification_client, + client_socket_ht_node); +end: + return client; +} + +static +bool trigger_applies_to_channel(struct lttng_trigger *trigger, + struct channel_info *info) +{ + enum lttng_condition_status status; + struct lttng_condition *condition; + const char *trigger_session_name = NULL; + const char *trigger_channel_name = NULL; + enum lttng_domain_type trigger_domain; + + condition = lttng_trigger_get_condition(trigger); + if (!condition) { + goto fail; + } + + switch (lttng_condition_get_type(condition)) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + break; + default: + goto fail; + } + + status = lttng_condition_buffer_usage_get_domain_type(condition, + &trigger_domain); + assert(status == LTTNG_CONDITION_STATUS_OK); + if (info->key.domain != trigger_domain) { + goto fail; + } + + status = lttng_condition_buffer_usage_get_session_name( + condition, &trigger_session_name); + assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name); + + status = lttng_condition_buffer_usage_get_channel_name( + condition, &trigger_channel_name); + assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name); + + if (strcmp(info->session_name, trigger_session_name)) { + goto fail; + } + if (strcmp(info->channel_name, trigger_channel_name)) { + goto fail; + } + + return true; +fail: + return false; +} + +static +bool trigger_applies_to_client(struct lttng_trigger *trigger, + struct notification_client *client) +{ + bool applies = false; + struct lttng_condition_list_element *condition_list_element; + + cds_list_for_each_entry(condition_list_element, &client->condition_list, + node) { + applies = lttng_condition_is_equal( + condition_list_element->condition, + lttng_trigger_get_condition(trigger)); + if (applies) { + break; + } + } + return applies; +} + +static +unsigned long hash_channel_key(struct channel_key *key) +{ + return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong( + (void *) (unsigned long) key->domain, lttng_ht_seed); +} + +static +int handle_notification_thread_command_add_channel( + struct notification_thread_state *state, + struct channel_info *channel_info, + enum lttng_error_code *cmd_result) +{ + struct cds_list_head trigger_list; + struct channel_info *new_channel_info; + struct channel_key *channel_key; + struct lttng_channel_trigger_list *channel_trigger_list = NULL; + struct lttng_trigger_ht_element *trigger_ht_element = NULL; + int trigger_count = 0; + struct cds_lfht_iter iter; + + DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain", + channel_info->channel_name, channel_info->session_name, + channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space"); + + CDS_INIT_LIST_HEAD(&trigger_list); + + new_channel_info = channel_info_copy(channel_info); + if (!new_channel_info) { + goto error; + } + + channel_key = &new_channel_info->key; + + /* Build a list of all triggers applying to the new channel. */ + cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element, + node) { + struct lttng_trigger_list_element *new_element; + + if (!trigger_applies_to_channel(trigger_ht_element->trigger, + channel_info)) { + continue; + } + + new_element = zmalloc(sizeof(*new_element)); + if (!new_element) { + goto error; + } + CDS_INIT_LIST_HEAD(&new_element->node); + new_element->trigger = trigger_ht_element->trigger; + cds_list_add(&new_element->node, &trigger_list); + trigger_count++; + } + + DBG("[notification-thread] Found %i triggers that apply to newly added channel", + trigger_count); + channel_trigger_list = zmalloc(sizeof(*channel_trigger_list)); + if (!channel_trigger_list) { + goto error; + } + channel_trigger_list->channel_key = *channel_key; + CDS_INIT_LIST_HEAD(&channel_trigger_list->list); + cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node); + cds_list_splice(&trigger_list, &channel_trigger_list->list); + + rcu_read_lock(); + /* Add channel to the channel_ht which owns the channel_infos. */ + cds_lfht_add(state->channels_ht, + hash_channel_key(channel_key), + &new_channel_info->channels_ht_node); + /* + * Add the list of triggers associated with this channel to the + * channel_triggers_ht. + */ + cds_lfht_add(state->channel_triggers_ht, + hash_channel_key(channel_key), + &channel_trigger_list->channel_triggers_ht_node); + rcu_read_unlock(); + *cmd_result = LTTNG_OK; + return 0; +error: + /* Empty trigger list */ + channel_info_destroy(new_channel_info); + return 1; +} + +static +int handle_notification_thread_command_remove_channel( + struct notification_thread_state *state, + uint64_t channel_key, enum lttng_domain_type domain, + enum lttng_error_code *cmd_result) +{ + struct cds_lfht_node *node; + struct cds_lfht_iter iter; + struct lttng_channel_trigger_list *trigger_list; + struct lttng_trigger_list_element *trigger_list_element, *tmp; + struct channel_key key = { .key = channel_key, .domain = domain }; + struct channel_info *channel_info; + + DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain", + channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space"); + + rcu_read_lock(); + + cds_lfht_lookup(state->channel_triggers_ht, + hash_channel_key(&key), + match_channel_trigger_list, + &key, + &iter); + node = cds_lfht_iter_get_node(&iter); + /* + * There is a severe internal error if we are being asked to remove a + * channel that doesn't exist. + */ + if (!node) { + ERR("[notification-thread] Channel being removed is unknown to the notification thread"); + goto end; + } + + /* Free the list of triggers associated with this channel. */ + trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, + channel_triggers_ht_node); + cds_list_for_each_entry_safe(trigger_list_element, tmp, + &trigger_list->list, node) { + cds_list_del(&trigger_list_element->node); + free(trigger_list_element); + } + cds_lfht_del(state->channel_triggers_ht, node); + free(trigger_list); + + /* Free sampled channel state. */ + cds_lfht_lookup(state->channel_state_ht, + hash_channel_key(&key), + match_channel_state_sample, + &key, + &iter); + node = cds_lfht_iter_get_node(&iter); + /* + * This is expected to be NULL if the channel is destroyed before we + * received a sample. + */ + if (node) { + struct channel_state_sample *sample = caa_container_of(node, + struct channel_state_sample, + channel_state_ht_node); + + cds_lfht_del(state->channel_state_ht, node); + free(sample); + } + + /* Remove the channel from the channels_ht and free it. */ + cds_lfht_lookup(state->channels_ht, + hash_channel_key(&key), + match_channel_info, + &key, + &iter); + node = cds_lfht_iter_get_node(&iter); + assert(node); + channel_info = caa_container_of(node, struct channel_info, + channels_ht_node); + cds_lfht_del(state->channels_ht, node); + channel_info_destroy(channel_info); +end: + rcu_read_unlock(); + *cmd_result = LTTNG_OK; + return 0; +} + +/* + * FIXME A client's credentials are not checked when registering a trigger, nor + * are they stored alongside with the trigger. + * + * The effects of this are benign: + * - The client will succeed in registering the trigger, as it is valid, + * - The trigger will, internally, be bound to the channel, + * - The notifications will not be sent since the client's credentials + * are checked against the channel at that moment. + */ +static +int handle_notification_thread_command_register_trigger( + struct notification_thread_state *state, + struct lttng_trigger *trigger, + enum lttng_error_code *cmd_result) +{ + int ret = 0; + struct lttng_condition *condition; + struct notification_client *client; + struct notification_client_list *client_list = NULL; + struct lttng_trigger_ht_element *trigger_ht_element = NULL; + struct notification_client_list_element *client_list_element, *tmp; + struct cds_lfht_node *node; + struct cds_lfht_iter iter; + struct channel_info *channel; + bool free_trigger = true; + + rcu_read_lock(); + + condition = lttng_trigger_get_condition(trigger); + trigger_ht_element = zmalloc(sizeof(*trigger_ht_element)); + if (!trigger_ht_element) { + ret = -1; + goto error; + } + + /* Add trigger to the trigger_ht. */ + cds_lfht_node_init(&trigger_ht_element->node); + trigger_ht_element->trigger = trigger; + + node = cds_lfht_add_unique(state->triggers_ht, + lttng_condition_hash(condition), + match_condition, + condition, + &trigger_ht_element->node); + if (node != &trigger_ht_element->node) { + /* Not a fatal error, simply report it to the client. */ + *cmd_result = LTTNG_ERR_TRIGGER_EXISTS; + goto error_free_ht_element; + } + + /* + * Ownership of the trigger and of its wrapper was transfered to + * the triggers_ht. + */ + trigger_ht_element = NULL; + free_trigger = false; + + /* + * The rest only applies to triggers that have a "notify" action. + * It is not skipped as this is the only action type currently + * supported. + */ + client_list = zmalloc(sizeof(*client_list)); + if (!client_list) { + ret = -1; + goto error_free_ht_element; + } + cds_lfht_node_init(&client_list->notification_trigger_ht_node); + CDS_INIT_LIST_HEAD(&client_list->list); + client_list->trigger = trigger; + + /* Build a list of clients to which this new trigger applies. */ + cds_lfht_for_each_entry(state->client_socket_ht, &iter, client, + client_socket_ht_node) { + if (!trigger_applies_to_client(trigger, client)) { + continue; + } + + client_list_element = zmalloc(sizeof(*client_list_element)); + if (!client_list_element) { + ret = -1; + goto error_free_client_list; + } + CDS_INIT_LIST_HEAD(&client_list_element->node); + client_list_element->client = client; + cds_list_add(&client_list_element->node, &client_list->list); + } + + cds_lfht_add(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + &client_list->notification_trigger_ht_node); + /* + * Client list ownership transferred to the + * notification_trigger_clients_ht. + */ + client_list = NULL; + + /* + * Add the trigger to list of triggers bound to the channels currently + * known. + */ + cds_lfht_for_each_entry(state->channels_ht, &iter, channel, + channels_ht_node) { + struct lttng_trigger_list_element *trigger_list_element; + struct lttng_channel_trigger_list *trigger_list; + + if (!trigger_applies_to_channel(trigger, channel)) { + continue; + } + + cds_lfht_lookup(state->channel_triggers_ht, + hash_channel_key(&channel->key), + match_channel_trigger_list, + &channel->key, + &iter); + node = cds_lfht_iter_get_node(&iter); + assert(node); + /* Free the list of triggers associated with this channel. */ + trigger_list = caa_container_of(node, + struct lttng_channel_trigger_list, + channel_triggers_ht_node); + + trigger_list_element = zmalloc(sizeof(*trigger_list_element)); + if (!trigger_list_element) { + ret = -1; + goto error_free_client_list; + } + CDS_INIT_LIST_HEAD(&trigger_list_element->node); + trigger_list_element->trigger = trigger; + cds_list_add(&trigger_list_element->node, &trigger_list->list); + /* A trigger can only apply to one channel. */ + break; + } + + *cmd_result = LTTNG_OK; +error_free_client_list: + if (client_list) { + cds_list_for_each_entry_safe(client_list_element, tmp, + &client_list->list, node) { + free(client_list_element); + } + free(client_list); + } +error_free_ht_element: + free(trigger_ht_element); +error: + if (free_trigger) { + lttng_trigger_destroy(trigger); + } + rcu_read_unlock(); + return ret; +} + +int handle_notification_thread_command_unregister_trigger( + struct notification_thread_state *state, + struct lttng_trigger *trigger, + enum lttng_error_code *_cmd_reply) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node, *triggers_ht_node; + struct lttng_channel_trigger_list *trigger_list; + struct notification_client_list *client_list; + struct notification_client_list_element *client_list_element, *tmp; + struct lttng_trigger_ht_element *trigger_ht_element = NULL; + struct lttng_condition *condition = lttng_trigger_get_condition( + trigger); + enum lttng_error_code cmd_reply; + + rcu_read_lock(); + + cds_lfht_lookup(state->triggers_ht, + lttng_condition_hash(condition), + match_condition, + condition, + &iter); + triggers_ht_node = cds_lfht_iter_get_node(&iter); + if (!triggers_ht_node) { + cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND; + goto end; + } else { + cmd_reply = LTTNG_OK; + } + + /* Remove trigger from channel_triggers_ht. */ + cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list, + channel_triggers_ht_node) { + struct lttng_trigger_list_element *trigger_element, *tmp; + + cds_list_for_each_entry_safe(trigger_element, tmp, + &trigger_list->list, node) { + struct lttng_condition *current_condition = + lttng_trigger_get_condition( + trigger_element->trigger); + + assert(current_condition); + if (!lttng_condition_is_equal(condition, + current_condition)) { + continue; + } + + DBG("[notification-thread] Removed trigger from channel_triggers_ht"); + cds_list_del(&trigger_element->node); + } + } + + /* + * Remove and release the client list from + * notification_trigger_clients_ht. + */ + cds_lfht_lookup(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + match_client_list, + trigger, + &iter); + node = cds_lfht_iter_get_node(&iter); + assert(node); + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + cds_list_for_each_entry_safe(client_list_element, tmp, + &client_list->list, node) { + free(client_list_element); + } + cds_lfht_del(state->notification_trigger_clients_ht, node); + free(client_list); + + /* Remove trigger from triggers_ht. */ + trigger_ht_element = caa_container_of(triggers_ht_node, + struct lttng_trigger_ht_element, node); + cds_lfht_del(state->triggers_ht, triggers_ht_node); + lttng_trigger_destroy(trigger_ht_element->trigger); + free(trigger_ht_element); +end: + rcu_read_unlock(); + if (_cmd_reply) { + *_cmd_reply = cmd_reply; + } + return 0; +} + +int handle_notification_thread_command( + struct notification_thread_handle *handle, + struct notification_thread_state *state) +{ + int ret; + uint64_t counter; + struct notification_thread_command *cmd; + + /* Read event_fd to put it back into a quiescent state. */ + ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter)); + if (ret == -1) { + goto error; + } + + pthread_mutex_lock(&handle->cmd_queue.lock); + cmd = cds_list_first_entry(&handle->cmd_queue.list, + struct notification_thread_command, cmd_list_node); + switch (cmd->type) { + case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER: + DBG("[notification-thread] Received register trigger command"); + ret = handle_notification_thread_command_register_trigger( + state, cmd->parameters.trigger, + &cmd->reply_code); + break; + case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER: + DBG("[notification-thread] Received unregister trigger command"); + ret = handle_notification_thread_command_unregister_trigger( + state, cmd->parameters.trigger, + &cmd->reply_code); + break; + case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL: + DBG("[notification-thread] Received add channel command"); + ret = handle_notification_thread_command_add_channel( + state, &cmd->parameters.add_channel, + &cmd->reply_code); + break; + case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL: + DBG("[notification-thread] Received remove channel command"); + ret = handle_notification_thread_command_remove_channel( + state, cmd->parameters.remove_channel.key, + cmd->parameters.remove_channel.domain, + &cmd->reply_code); + break; + default: + ERR("[notification-thread] Unknown internal command received"); + goto error_unlock; + } + + if (ret) { + goto error_unlock; + } + + cds_list_del(&cmd->cmd_list_node); + futex_nto1_wake(&cmd->reply_futex); + pthread_mutex_unlock(&handle->cmd_queue.lock); + return 0; +error_unlock: + /* Wake-up and return a fatal error to the calling thread. */ + futex_nto1_wake(&cmd->reply_futex); + pthread_mutex_unlock(&handle->cmd_queue.lock); + cmd->reply_code = LTTNG_ERR_FATAL; +error: + /* Indicate a fatal error to the caller. */ + return 1; +} + +static +unsigned long hash_client_socket(int socket) +{ + return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed); +} + +int handle_notification_thread_client_connect( + struct notification_thread_state *state) +{ + int ret; + struct notification_client *client; + + DBG("[notification-thread] Handling new notification channel client connection"); + + client = zmalloc(sizeof(*client)); + if (!client) { + /* Fatal error. */ + ret = -1; + goto error; + } + CDS_INIT_LIST_HEAD(&client->condition_list); + + ret = lttcomm_accept_unix_sock(state->notification_channel_socket); + if (ret < 0) { + ERR("[notification-thread] Failed to accept new notification channel client connection"); + ret = 0; + goto error; + } + + client->socket = ret; + + /* FIXME set client socket as non-blocking. */ + /* + ret = set_socket_non_blocking(client->socket); + if (ret) { + ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking"); + goto error; + } + */ + + /* FIXME handle creds. */ + ret = lttcomm_setsockopt_creds_unix_sock(client->socket); + if (ret < 0) { + ERR("[notification-thread] Failed to set socket options on new notification channel client socket"); + ret = 0; + goto error; + } + + /* FIXME protocol version handshake. */ + + ret = lttng_poll_add(&state->events, client->socket, + LPOLLIN | LPOLLERR | + LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { + ERR("[notification-thread] Failed to add notification channel client socket to poll set"); + ret = 0; + goto error; + } + DBG("[notification-thread] Added new notification channel client socket (%i) to poll set", + client->socket); + + /* Add to ht. */ + rcu_read_lock(); + cds_lfht_add(state->client_socket_ht, + hash_client_socket(client->socket), + &client->client_socket_ht_node); + rcu_read_unlock(); + + return ret; +error: + notification_client_destroy(client, state); + return ret; +} + +int handle_notification_thread_client_disconnect( + int client_socket, + struct notification_thread_state *state) +{ + int ret = 0; + struct notification_client *client; + + rcu_read_lock(); + DBG("[notification-thread] Closing client connection (socket fd = %i)", + client_socket); + client = get_client_from_socket(client_socket, state); + if (!client) { + /* Internal state corruption, fatal error. */ + ERR("[notification-thread] Unable to find client (socket fd = %i)", + client_socket); + ret = -1; + goto end; + } + + ret = lttng_poll_del(&state->events, client_socket); + if (ret) { + ERR("[notification-thread] Failed to remove client socket from poll set"); + } + cds_lfht_del(state->client_socket_ht, + &client->client_socket_ht_node); + notification_client_destroy(client, state); +end: + rcu_read_unlock(); + return ret; +} + +int handle_notification_thread_client_disconnect_all( + struct notification_thread_state *state) +{ + struct cds_lfht_iter iter; + struct notification_client *client; + bool error_encoutered = false; + + rcu_read_lock(); + DBG("[notification-thread] Closing all client connections"); + cds_lfht_for_each_entry(state->client_socket_ht, &iter, client, + client_socket_ht_node) { + int ret; + + ret = handle_notification_thread_client_disconnect( + client->socket, state); + if (ret) { + error_encoutered = true; + } + } + rcu_read_unlock(); + return error_encoutered ? 1 : 0; +} + +int handle_notification_thread_trigger_unregister_all( + struct notification_thread_state *state) +{ + bool error_occured = false; + struct cds_lfht_iter iter; + struct lttng_trigger_ht_element *trigger_ht_element; + + cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element, + node) { + int ret = handle_notification_thread_command_unregister_trigger( + state, trigger_ht_element->trigger, NULL); + if (ret) { + error_occured = true; + } + } + return error_occured ? -1 : 0; +} + +static +int send_client_reply(int socket, + enum lttng_notification_channel_status status) +{ + ssize_t ret; + struct lttng_notification_channel_command_reply reply = { + .status = (int8_t) status, + }; + struct lttng_notification_channel_message msg = { + .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY, + .size = sizeof(reply), + }; + char *buffer[sizeof(msg) + sizeof(reply)] = {}; + + memcpy(buffer, &msg, sizeof(msg)); + memcpy(buffer + sizeof(msg), &reply, sizeof(reply)); + DBG("[notification-thread] Send command reply (%i)", (int) status); + + ret = lttcomm_send_unix_sock(socket, buffer, + sizeof(msg) + sizeof(reply)); + if (ret < 0) { + ERR("[notification-thread] Failed to send command reply"); + goto error; + } + return 0; +error: + return -1; +} + +int handle_notification_thread_client(struct notification_thread_state *state, + int socket) +{ + int ret = 0; + size_t received = 0; + struct notification_client *client; + struct lttng_notification_channel_message msg; + struct lttng_dynamic_buffer buffer; + struct lttng_condition *condition = NULL; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + + lttng_dynamic_buffer_init(&buffer); + + client = get_client_from_socket(socket, state); + if (!client) { + /* Internal error, abort. */ + ret = 1; + goto error_no_reply; + } + + /* Receive message header. */ + do { + ssize_t recv_ret; + + recv_ret = lttcomm_recv_unix_sock(socket, + ((char *) &msg) + received, + sizeof(msg) - received); + if (recv_ret <= 0) { + ERR("[notification-thread] Failed to receive channel command from client (received %zu bytes)", received); + /* + * Protocol error, disconnect the client but don't + * signal an error. + */ + goto error_disconnect_client; + } + received += recv_ret; + } while (received < sizeof(msg)); + + ret = lttng_dynamic_buffer_set_size(&buffer, msg.size); + if (ret) { + goto error_disconnect_client; + } + + /* Receive message body. */ + received = 0; + do { + ssize_t recv_ret; + + recv_ret = lttcomm_recv_unix_sock(socket, + buffer.data + received, + msg.size - received); + if (recv_ret <= 0) { + ERR("[notification-thread] Failed to receive condition from client"); + goto error_disconnect_client; + } + received += recv_ret; + } while (received < msg.size); + + ret = lttng_condition_create_from_buffer(buffer.data, &condition); + if (ret < 0 || ret < msg.size) { + ERR("[notification-thread] Malformed condition received from client"); + goto error_disconnect_client; + } + + DBG("[notification-thread] Successfully received condition from notification channel client"); + + switch ((enum lttng_notification_channel_message_type) msg.type) { + case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE: + ret = notification_thread_client_subscribe(client, condition, + state, &status); + break; + case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE: + ret = notification_thread_client_unsubscribe(client, condition, + state, &status); + break; + default: + ERR("[notification-thread] Unknown command type received from notification channel client"); + goto error_disconnect_client; + } + + if (send_client_reply(socket, status)) { + ERR("[notification-thread] Failed to send reply to notification channel client"); + goto error_disconnect_client; + } + + lttng_dynamic_buffer_reset(&buffer); + ret = 0; + return ret; + +error_disconnect_client: + ret = handle_notification_thread_client_disconnect(socket, state); +error_no_reply: + lttng_dynamic_buffer_reset(&buffer); + lttng_condition_destroy(condition); + return ret; +} + +static +bool evaluate_buffer_usage_condition(struct lttng_condition *condition, + struct channel_state_sample *sample, uint64_t buffer_capacity) +{ + bool result = false; + uint64_t threshold; + enum lttng_condition_type condition_type; + struct lttng_condition_buffer_usage *use_condition = container_of( + condition, struct lttng_condition_buffer_usage, + parent); + + if (!sample) { + goto end; + } + + if (use_condition->threshold_bytes.set) { + threshold = use_condition->threshold_bytes.value; + } else { + /* Threshold was expressed as a ratio. */ + threshold = (uint64_t) (use_condition->threshold_ratio.value * + (double) buffer_capacity); + } + + condition_type = lttng_condition_get_type(condition); + if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) { + /* + * The low condition should only be triggered once _all_ of the + * streams in a channel have gone below the "low" threshold. + */ + if (sample->highest_usage <= threshold) { + result = true; + } + } else { + /* + * For high buffer usage scenarios, we want to trigger whenever + * _any_ of the streams has reached the "high" threshold. + */ + if (sample->highest_usage >= threshold) { + result = true; + } + } +end: + return result; +} + +static +int evaluate_condition(struct lttng_condition *condition, + struct lttng_evaluation **evaluation, + struct notification_thread_state *state, + struct channel_state_sample *previous_sample, + struct channel_state_sample *latest_sample, + uint64_t buffer_capacity) +{ + int ret = 0; + enum lttng_condition_type condition_type; + bool previous_sample_result; + bool latest_sample_result; + + condition_type = lttng_condition_get_type(condition); + /* No other condition type supported for the moment. */ + assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW || + condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH); + + previous_sample_result = evaluate_buffer_usage_condition(condition, + previous_sample, buffer_capacity); + latest_sample_result = evaluate_buffer_usage_condition(condition, + latest_sample, buffer_capacity); + + if (!latest_sample_result || + (previous_sample_result == latest_sample_result)) { + /* + * Only trigger on a condition evaluation transition. + * NOTE: This edge-triggered logic may not be appropriate for + * future condition types. + */ + goto end; + } + + if (evaluation && latest_sample_result) { + *evaluation = lttng_evaluation_buffer_usage_create( + condition_type, + latest_sample->highest_usage, + buffer_capacity); + if (!*evaluation) { + ret = -1; + goto end; + } + } +end: + return ret; +} + +static +int send_evaluation_to_clients(struct lttng_trigger *trigger, + struct lttng_evaluation *evaluation, + struct notification_client_list* client_list) +{ + int ret = 0; + struct lttng_dynamic_buffer msg_buffer; + struct notification_client_list_element *client_list_element, *tmp; + struct lttng_notification *notification; + struct lttng_condition *condition; + ssize_t expected_notification_size, notification_size; + struct lttng_notification_channel_message msg; + + lttng_dynamic_buffer_init(&msg_buffer); + + condition = lttng_trigger_get_condition(trigger); + assert(condition); + + notification = lttng_notification_create(condition, evaluation); + if (!notification) { + ret = -1; + goto end; + } + + expected_notification_size = lttng_notification_serialize(notification, + NULL); + if (expected_notification_size < 0) { + ERR("[notification-thread] Failed to get size of serialized notification"); + ret = -1; + goto end; + } + + msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION; + msg.size = (uint32_t) expected_notification_size; + ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg)); + if (ret) { + goto end; + } + + ret = lttng_dynamic_buffer_set_size(&msg_buffer, + msg_buffer.size + expected_notification_size); + if (ret) { + goto end; + } + + notification_size = lttng_notification_serialize(notification, + msg_buffer.data + sizeof(msg)); + if (notification_size != expected_notification_size) { + ERR("[notification-thread] Failed to serialize notification"); + ret = -1; + goto end; + } + + cds_list_for_each_entry_safe(client_list_element, tmp, + &client_list->list, node) { + ret = lttcomm_send_unix_sock( + client_list_element->client->socket, + msg_buffer.data, msg_buffer.size); + if (ret < 0) { + ERR("[notification-thread] Failed to send notification to client"); + } + } + ret = 0; +end: + lttng_notification_destroy(notification); + lttng_dynamic_buffer_reset(&msg_buffer); + return ret; +} + +int handle_notification_thread_channel_sample( + struct notification_thread_state *state, int pipe, + enum lttng_domain_type domain) +{ + int ret = 0; + struct lttcomm_consumer_channel_monitor_msg sample_msg; + struct channel_state_sample previous_sample, latest_sample; + struct channel_info *channel_info; + struct cds_lfht_node *node; + struct cds_lfht_iter iter; + struct lttng_channel_trigger_list *trigger_list; + struct lttng_trigger_list_element *trigger_list_element; + bool previous_sample_available = false; + + /* + * The monitoring pipe only holds messages smaller than PIPE_BUF, + * ensuring that read/write of sampling messages are atomic. + */ + do { + ret = read(pipe, &sample_msg, sizeof(sample_msg)); + } while (ret == -1 && errno == EINTR); + if (ret != sizeof(sample_msg)) { + ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)", + pipe); + ret = -1; + goto end; + } + + ret = 0; + latest_sample.key.key = sample_msg.key; + latest_sample.key.domain = domain; + latest_sample.highest_usage = sample_msg.highest; + latest_sample.lowest_usage = sample_msg.lowest; + + DBG("[notification-thread] Handling channel sample (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")", + latest_sample.highest_usage, + latest_sample.lowest_usage); + + rcu_read_lock(); + + /* Retrieve the channel's informations */ + cds_lfht_lookup(state->channels_ht, + hash_channel_key(&latest_sample.key), + match_channel_info, + &latest_sample.key, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + /* + * Not an error since the consumer can push a sample to the pipe + * and the rest of the session daemon could notify us of the + * channel's destruction before we get a chance to process that + * sample. + */ + DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain", + latest_sample.key.key, + domain == LTTNG_DOMAIN_KERNEL ? "kernel" : + "user space"); + goto end_unlock; + } + channel_info = caa_container_of(node, struct channel_info, + channels_ht_node); + + /* Retrieve the channel's last sample, if it exists, and update it. */ + cds_lfht_lookup(state->channel_state_ht, + hash_channel_key(&latest_sample.key), + match_channel_state_sample, + &latest_sample.key, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (node) { + struct channel_state_sample *stored_sample; + + /* Update the sample stored. */ + stored_sample = caa_container_of(node, + struct channel_state_sample, + channel_state_ht_node); + memcpy(&previous_sample, stored_sample, + sizeof(previous_sample)); + stored_sample->highest_usage = latest_sample.highest_usage; + stored_sample->lowest_usage = latest_sample.lowest_usage; + previous_sample_available = true; + } else { + /* + * This is the channel's first sample, allocate space for and + * store the new sample. + */ + struct channel_state_sample *stored_sample; + + stored_sample = zmalloc(sizeof(*stored_sample)); + if (!stored_sample) { + ret = -1; + goto end_unlock; + } + + memcpy(stored_sample, &latest_sample, sizeof(*stored_sample)); + cds_lfht_node_init(&stored_sample->channel_state_ht_node); + cds_lfht_add(state->channel_state_ht, + hash_channel_key(&stored_sample->key), + &stored_sample->channel_state_ht_node); + } + + /* Find triggers associated with this channel. */ + cds_lfht_lookup(state->channel_triggers_ht, + hash_channel_key(&latest_sample.key), + match_channel_trigger_list, + &latest_sample.key, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end_unlock; + } + + trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, + channel_triggers_ht_node); + cds_list_for_each_entry(trigger_list_element, &trigger_list->list, + node) { + struct lttng_condition *condition; + struct lttng_action *action; + struct lttng_trigger *trigger; + struct notification_client_list *client_list; + struct lttng_evaluation *evaluation = NULL; + + trigger = trigger_list_element->trigger; + condition = lttng_trigger_get_condition(trigger); + assert(condition); + action = lttng_trigger_get_action(trigger); + + /* Notify actions are the only type currently supported. */ + assert(lttng_action_get_type(action) == + LTTNG_ACTION_TYPE_NOTIFY); + + /* + * Check if any client is subscribed to the result of this + * evaluation. + */ + cds_lfht_lookup(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + match_client_list, + trigger, + &iter); + node = cds_lfht_iter_get_node(&iter); + assert(node); + + client_list = caa_container_of(node, + struct notification_client_list, + notification_trigger_ht_node); + if (cds_list_empty(&client_list->list)) { + /* + * No clients interested in the evaluation's result, + * skip it. + */ + continue; + } + + ret = evaluate_condition(condition, &evaluation, state, + previous_sample_available ? &previous_sample : NULL, + &latest_sample, channel_info->capacity); + if (ret) { + goto end_unlock; + } + + if (!evaluation) { + continue; + } + + /* Dispatch evaluation result to all clients. */ + ret = send_evaluation_to_clients(trigger_list_element->trigger, + evaluation, client_list); + if (ret) { + goto end_unlock; + } + } +end_unlock: + rcu_read_unlock(); +end: + return ret; +} diff --git a/src/bin/lttng-sessiond/notification-thread-events.h b/src/bin/lttng-sessiond/notification-thread-events.h new file mode 100644 index 000000000..941154af0 --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread-events.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef NOTIFICATION_THREAD_EVENTS_H +#define NOTIFICATION_THREAD_EVENTS_H + +#include +#include "notification-thread.h" + +/** + * Event handling function shall only return an error if + * the thread should be stopped. + */ +int handle_notification_thread_command( + struct notification_thread_handle *handle, + struct notification_thread_state *state); + +int handle_notification_thread_client_connect( + struct notification_thread_state *state); + +int handle_notification_thread_client_disconnect( + int client_fd, + struct notification_thread_state *state); + +int handle_notification_thread_client_disconnect_all( + struct notification_thread_state *state); + +int handle_notification_thread_trigger_unregister_all( + struct notification_thread_state *state); + +int handle_notification_thread_client(struct notification_thread_state *state, + int socket); + +int handle_notification_thread_channel_sample( + struct notification_thread_state *state, int pipe, + enum lttng_domain_type domain); + +#endif /* NOTIFICATION_THREAD_EVENTS_H */ diff --git a/src/bin/lttng-sessiond/notification-thread.c b/src/bin/lttng-sessiond/notification-thread.c new file mode 100644 index 000000000..fcf7260ac --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread.c @@ -0,0 +1,682 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _LGPL_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "notification-thread.h" +#include "notification-thread-events.h" +#include "notification-thread-commands.h" +#include "lttng-sessiond.h" +#include "health-sessiond.h" + +#include +#include +#include + +/** + * This thread maintains an internal state associating clients and triggers. + * + * In order to speed-up and simplify queries, hash tables providing the + * following associations are maintained: + * + * - client_socket_ht: associate a client's socket (fd) to its "struct client" + * This hash table owns the "struct client" which must thus be + * disposed-of on removal from the hash table. + * + * - channel_triggers_ht: + * associates a channel key to a list of + * struct lttng_trigger_list_nodes. The triggers in this list are + * those that have conditions that apply to this channel. + * This hash table owns the list, but not the triggers themselves. + * + * - channel_state_ht: + * associates a pair (channel key, channel domain) to its last + * sampled state received from the consumer daemon + * (struct channel_state). + * This previous sample is kept to implement edge-triggered + * conditions as we need to detect the state transitions. + * This hash table owns the channel state. + * + * - notification_trigger_clients_ht: + * associates notification-emitting triggers to clients + * (struct notification_client_ht_node) subscribed to those + * conditions. + * The condition's hash and match functions are used directly since + * all triggers in this hash table have the "notify" action. + * This hash table holds no ownership. + * + * - channels_ht: + * associates a channel_key to a struct channel_info. The hash table + * holds the ownership of the struct channel_info. + * + * - triggers_ht: + * associated a condition to a struct lttng_trigger_ht_element. + * The hash table holds the ownership of the + * lttng_trigger_ht_elements along with the triggers themselves. + * + * The thread reacts to the following internal events: + * 1) creation of a tracing channel, + * 2) destruction of a tracing channel, + * 3) registration of a trigger, + * 4) unregistration of a trigger, + * 5) reception of a channel monitor sample from the consumer daemon. + * + * Events specific to notification-emitting triggers: + * 6) connection of a notification client, + * 7) disconnection of a notification client, + * 8) subscription of a client to a conditions' notifications, + * 9) unsubscription of a client from a conditions' notifications, + * + * + * 1) Creation of a tracing channel + * - notification_trigger_clients_ht is traversed to identify + * triggers which apply to this new channel, + * - triggers identified are added to the channel_triggers_ht. + * - add channel to channels_ht + * + * 2) Destruction of a tracing channel + * - remove entry from channel_triggers_ht, releasing the list wrapper and + * elements, + * - remove entry from the channel_state_ht. + * - remove channel from channels_ht + * + * 3) Registration of a trigger + * - if the trigger's action is of type "notify", + * - traverse the list of conditions of every client to build a list of + * clients which have to be notified when this trigger's condition is met, + * - add list of clients (even if it is empty) to the + * notification_trigger_clients_ht, + * - add trigger to channel_triggers_ht (if applicable), + * - add trigger to triggers_ht + * + * 4) Unregistration of a trigger + * - if the trigger's action is of type "notify", + * - remove the trigger from the notification_trigger_clients_ht, + * - remove trigger from channel_triggers_ht (if applicable), + * - remove trigger from triggers_ht + * + * 5) Reception of a channel monitor sample from the consumer daemon + * - evaluate the conditions associated with the triggers found in + * the channel_triggers_ht, + * - if a condition evaluates to "true" and the condition is of type + * "notify", query the notification_trigger_clients_ht and send + * a notification to the clients. + * + * 6) Connection of a client + * - add client socket to the client_socket_ht. + * + * 7) Disconnection of a client + * - remove client socket from the client_socket_ht, + * - traverse all conditions to which the client is subscribed and remove + * the client from the notification_trigger_clients_ht. + * + * 8) Subscription of a client to a condition's notifications + * - Add the condition to the client's list of subscribed conditions, + * - Look-up notification_trigger_clients_ht and add the client to + * list of clients. + * + * 9) Unsubscription of a client to a condition's notifications + * - Remove the condition from the client's list of subscribed conditions, + * - Look-up notification_trigger_clients_ht and remove the client + * from the list of clients. + */ + +/* + * Destroy the thread data previously created by the init function. + */ +void notification_thread_handle_destroy( + struct notification_thread_handle *handle) +{ + int ret; + struct notification_thread_command *cmd, *tmp; + + if (!handle) { + goto end; + } + + if (handle->cmd_queue.event_fd < 0) { + goto end; + } + ret = close(handle->cmd_queue.event_fd); + if (ret < 0) { + PERROR("close notification command queue event_fd"); + } + + pthread_mutex_lock(&handle->cmd_queue.lock); + /* Purge queue of in-flight commands and mark them as cancelled. */ + cds_list_for_each_entry_safe(cmd, tmp, &handle->cmd_queue.list, + cmd_list_node) { + cds_list_del(&cmd->cmd_list_node); + cmd->reply_code = LTTNG_ERR_COMMAND_CANCELLED; + futex_nto1_wake(&cmd->reply_futex); + } + pthread_mutex_unlock(&handle->cmd_queue.lock); + pthread_mutex_destroy(&handle->cmd_queue.lock); + + if (handle->channel_monitoring_pipes.ust32_consumer >= 0) { + ret = close(handle->channel_monitoring_pipes.ust32_consumer); + if (ret) { + PERROR("close 32-bit consumer channel monitoring pipe"); + } + } + if (handle->channel_monitoring_pipes.ust64_consumer >= 0) { + ret = close(handle->channel_monitoring_pipes.ust64_consumer); + if (ret) { + PERROR("close 64-bit consumer channel monitoring pipe"); + } + } + if (handle->channel_monitoring_pipes.kernel_consumer >= 0) { + ret = close(handle->channel_monitoring_pipes.kernel_consumer); + if (ret) { + PERROR("close kernel consumer channel monitoring pipe"); + } + } +end: + free(handle); +} + +struct notification_thread_handle *notification_thread_handle_create( + struct lttng_pipe *ust32_channel_monitor_pipe, + struct lttng_pipe *ust64_channel_monitor_pipe, + struct lttng_pipe *kernel_channel_monitor_pipe) +{ + int ret; + struct notification_thread_handle *handle; + + handle = zmalloc(sizeof(*handle)); + if (!handle) { + goto end; + } + + /* FIXME Replace eventfd by a pipe to support older kernels. */ + handle->cmd_queue.event_fd = eventfd(0, EFD_CLOEXEC); + if (handle->cmd_queue.event_fd < 0) { + PERROR("eventfd notification command queue"); + goto error; + } + CDS_INIT_LIST_HEAD(&handle->cmd_queue.list); + ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL); + if (ret) { + goto error; + } + + if (ust32_channel_monitor_pipe) { + handle->channel_monitoring_pipes.ust32_consumer = + lttng_pipe_release_readfd( + ust32_channel_monitor_pipe); + if (handle->channel_monitoring_pipes.ust32_consumer < 0) { + goto error; + } + } else { + handle->channel_monitoring_pipes.ust32_consumer = -1; + } + if (ust64_channel_monitor_pipe) { + handle->channel_monitoring_pipes.ust64_consumer = + lttng_pipe_release_readfd( + ust64_channel_monitor_pipe); + if (handle->channel_monitoring_pipes.ust64_consumer < 0) { + goto error; + } + } else { + handle->channel_monitoring_pipes.ust64_consumer = -1; + } + if (kernel_channel_monitor_pipe) { + handle->channel_monitoring_pipes.kernel_consumer = + lttng_pipe_release_readfd( + kernel_channel_monitor_pipe); + if (handle->channel_monitoring_pipes.kernel_consumer < 0) { + goto error; + } + } else { + handle->channel_monitoring_pipes.kernel_consumer = -1; + } +end: + return handle; +error: + notification_thread_handle_destroy(handle); + return NULL; +} + +static +char *get_notification_channel_sock_path(void) +{ + int ret; + bool is_root = !getuid(); + char *sock_path; + + sock_path = zmalloc(LTTNG_PATH_MAX); + if (!sock_path) { + goto error; + } + + if (is_root) { + ret = snprintf(sock_path, LTTNG_PATH_MAX, + DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK); + if (ret < 0) { + goto error; + } + } else { + char *home_path = utils_get_home_dir(); + + if (!home_path) { + ERR("Can't get HOME directory for socket creation"); + goto error; + } + + ret = snprintf(sock_path, LTTNG_PATH_MAX, + DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK, + home_path); + if (ret < 0) { + goto error; + } + } + + return sock_path; +error: + free(sock_path); + return NULL; +} + +static +void notification_channel_socket_destroy(int fd) +{ + int ret; + char *sock_path = get_notification_channel_sock_path(); + + DBG("[notification-thread] Destroying notification channel socket"); + + if (sock_path) { + ret = unlink(sock_path); + free(sock_path); + if (ret < 0) { + PERROR("unlink notification channel socket"); + } + } + + ret = close(fd); + if (ret) { + PERROR("close notification channel socket"); + } +} + +static +int notification_channel_socket_create(void) +{ + int fd = -1, ret; + char *sock_path = get_notification_channel_sock_path(); + + DBG("[notification-thread] Creating notification channel UNIX socket at %s", + sock_path); + + ret = lttcomm_create_unix_sock(sock_path); + if (ret < 0) { + ERR("[notification-thread] Failed to create notification socket"); + goto error; + } + fd = ret; + + ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + if (ret < 0) { + ERR("Set file permissions failed: %s", sock_path); + PERROR("chmod notification channel socket"); + goto error; + } + + DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)", + fd); + free(sock_path); + return fd; +error: + if (fd >= 0 && close(fd) < 0) { + PERROR("close notification channel socket"); + } + free(sock_path); + return ret; +} + +static +int init_poll_set(struct lttng_poll_event *poll_set, + struct notification_thread_handle *handle, + int notification_channel_socket) +{ + int ret; + + /* + * Create pollset with size 6: + * - quit pipe, + * - notification channel socket (listen for new connections), + * - command queue event fd (internal sessiond commands), + * - consumerd (32-bit user space) channel monitor pipe, + * - consumerd (64-but user space) channel monitor pipe, + * - consumerd (kernel) channel monitor pipe. + */ + ret = sessiond_set_thread_pollset(poll_set, 6); + if (ret < 0) { + goto end; + } + + ret = lttng_poll_add(poll_set, notification_channel_socket, + LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { + ERR("[notification-thread] Failed to add notification channel socket to pollset"); + goto error; + } + ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add notification command queue event fd to pollset"); + goto error; + } + ret = lttng_poll_add(poll_set, + handle->channel_monitoring_pipes.ust32_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset"); + goto error; + } + ret = lttng_poll_add(poll_set, + handle->channel_monitoring_pipes.ust64_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset"); + goto error; + } + if (handle->channel_monitoring_pipes.kernel_consumer < 0) { + goto end; + } + ret = lttng_poll_add(poll_set, + handle->channel_monitoring_pipes.kernel_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset"); + goto error; + } +end: + return ret; +error: + lttng_poll_clean(poll_set); + return ret; +} + +static +void fini_thread_state(struct notification_thread_state *state) +{ + int ret; + + if (state->client_socket_ht) { + ret = handle_notification_thread_client_disconnect_all(state); + assert(!ret); + ret = cds_lfht_destroy(state->client_socket_ht, NULL); + assert(!ret); + } + if (state->triggers_ht) { + ret = handle_notification_thread_trigger_unregister_all(state); + assert(!ret); + ret = cds_lfht_destroy(state->triggers_ht, NULL); + assert(!ret); + } + if (state->channel_triggers_ht) { + ret = cds_lfht_destroy(state->channel_triggers_ht, NULL); + assert(!ret); + } + if (state->channel_state_ht) { + ret = cds_lfht_destroy(state->channel_state_ht, NULL); + assert(!ret); + } + if (state->notification_trigger_clients_ht) { + ret = cds_lfht_destroy(state->notification_trigger_clients_ht, + NULL); + assert(!ret); + } + if (state->channels_ht) { + ret = cds_lfht_destroy(state->channels_ht, + NULL); + assert(!ret); + } + + if (state->notification_channel_socket >= 0) { + notification_channel_socket_destroy( + state->notification_channel_socket); + } + lttng_poll_clean(&state->events); +} + +static +int init_thread_state(struct notification_thread_handle *handle, + struct notification_thread_state *state) +{ + int ret; + + memset(state, 0, sizeof(*state)); + state->notification_channel_socket = -1; + lttng_poll_init(&state->events); + + ret = notification_channel_socket_create(); + if (ret < 0) { + goto end; + } + state->notification_channel_socket = ret; + + ret = init_poll_set(&state->events, handle, + state->notification_channel_socket); + if (ret) { + goto end; + } + + DBG("[notification-thread] Listening on notification channel socket"); + ret = lttcomm_listen_unix_sock(state->notification_channel_socket); + if (ret < 0) { + ERR("[notification-thread] Listen failed on notification channel socket"); + goto error; + } + + state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->client_socket_ht) { + goto error; + } + + state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->channel_triggers_ht) { + goto error; + } + + state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->channel_state_ht) { + goto error; + } + + state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->notification_trigger_clients_ht) { + goto error; + } + + state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->channels_ht) { + goto error; + } + + state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->triggers_ht) { + goto error; + } +end: + return 0; +error: + fini_thread_state(state); + return -1; +} + +/* + * This thread services notification channel clients and received notifications + * from various lttng-sessiond components over a command queue. + */ +void *thread_notification(void *data) +{ + int ret; + struct notification_thread_handle *handle = data; + struct notification_thread_state state; + + DBG("[notification-thread] Started notification thread"); + + if (!handle) { + ERR("[notification-thread] Invalid thread context provided"); + goto end; + } + + rcu_register_thread(); + rcu_thread_online(); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION); + health_code_update(); + + ret = init_thread_state(handle, &state); + if (ret) { + goto end; + } + + /* Ready to handle client connections. */ + sessiond_notify_ready(); + + while (true) { + int fd_count, i; + + health_poll_entry(); + DBG("[notification-thread] Entering poll wait"); + ret = lttng_poll_wait(&state.events, -1); + DBG("[notification-thread] Poll wait returned (%i)", ret); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + continue; + } + ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret); + goto error; + } + + fd_count = ret; + for (i = 0; i < fd_count; i++) { + int fd = LTTNG_POLL_GETFD(&state.events, i); + uint32_t revents = LTTNG_POLL_GETEV(&state.events, i); + + DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents); + + /* Thread quit pipe has been closed. Killing thread. */ + if (sessiond_check_thread_quit_pipe(fd, revents)) { + DBG("[notification-thread] Quit pipe signaled, exiting."); + goto exit; + } + + if (fd == state.notification_channel_socket) { + if (revents & LPOLLIN) { + ret = handle_notification_thread_client_connect( + &state); + if (ret < 0) { + goto error; + } + } else if (revents & + (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("[notification-thread] Notification socket poll error"); + goto error; + } else { + ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd); + goto error; + } + } else if (fd == handle->cmd_queue.event_fd) { + ret = handle_notification_thread_command(handle, + &state); + if (ret) { + goto error; + } + } else if (fd == handle->channel_monitoring_pipes.ust32_consumer) { + ret = handle_notification_thread_channel_sample( + &state, fd, LTTNG_DOMAIN_UST); + if (ret) { + goto error; + } + } else if (fd == handle->channel_monitoring_pipes.ust64_consumer) { + ret = handle_notification_thread_channel_sample( + &state, fd, LTTNG_DOMAIN_UST); + if (ret) { + goto error; + } + } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) { + ret = handle_notification_thread_channel_sample( + &state, fd, LTTNG_DOMAIN_KERNEL); + if (ret) { + goto error; + } + } else { + /* Activity on a client's socket. */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* + * It doesn't matter if a command was + * pending on the client socket at this + * point since it now has now way to + * receive the notifications to which + * it was subscribing or unsubscribing. + */ + ret = handle_notification_thread_client_disconnect( + fd, &state); + if (ret) { + goto error; + } + } else if (revents & LPOLLIN) { + ret = handle_notification_thread_client( + &state, fd); + if (ret) { + goto error; + } + } else { + DBG("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd); + } + } + } + } +exit: +error: + fini_thread_state(&state); + health_unregister(health_sessiond); + rcu_thread_offline(); + rcu_unregister_thread(); +end: + return NULL; +} diff --git a/src/bin/lttng-sessiond/notification-thread.h b/src/bin/lttng-sessiond/notification-thread.h new file mode 100644 index 000000000..c401b410b --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread.h @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef NOTIFICATION_THREAD_H +#define NOTIFICATION_THREAD_H + +#include +#include +#include +#include +#include +#include +#include +#include + +struct notification_thread_handle { + /* + * Queue of struct notification command. + * event_fd must be WRITE(2) to signal that a new command + * has been enqueued. + */ + struct { + int event_fd; + struct cds_list_head list; + pthread_mutex_t lock; + } cmd_queue; + /* + * Read side of pipes used to receive channel status info collected + * by the various consumer daemons. + */ + struct { + int ust32_consumer; + int ust64_consumer; + int kernel_consumer; + } channel_monitoring_pipes; +}; + +struct notification_thread_state { + int notification_channel_socket; + struct lttng_poll_event events; + struct cds_lfht *client_socket_ht; + struct cds_lfht *channel_triggers_ht; + struct cds_lfht *channel_state_ht; + struct cds_lfht *notification_trigger_clients_ht; + struct cds_lfht *channels_ht; + struct cds_lfht *triggers_ht; +}; + +/* notification_thread_data takes ownership of the channel monitor pipes. */ +struct notification_thread_handle *notification_thread_handle_create( + struct lttng_pipe *ust32_channel_monitor_pipe, + struct lttng_pipe *ust64_channel_monitor_pipe, + struct lttng_pipe *kernel_channel_monitor_pipe); +void notification_thread_handle_destroy( + struct notification_thread_handle *handle); + +void *thread_notification(void *data); + +#endif /* NOTIFICATION_THREAD_H */ diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 5a41c3800..eb4a76a2f 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -41,6 +41,8 @@ #include "ust-ctl.h" #include "utils.h" #include "session.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" static int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess); @@ -482,7 +484,8 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan, /* Wipe and free registry from session registry. */ registry = get_session_registry(ua_chan->session); if (registry) { - ust_registry_channel_del_free(registry, ua_chan->key); + ust_registry_channel_del_free(registry, ua_chan->key, + true); } save_per_pid_lost_discarded_counters(ua_chan); } @@ -2839,6 +2842,7 @@ static int create_channel_per_uid(struct ust_app *app, int ret; struct buffer_reg_uid *reg_uid; struct buffer_reg_channel *reg_chan; + bool created = false; assert(app); assert(usess); @@ -2882,7 +2886,7 @@ static int create_channel_per_uid(struct ust_app *app, * it's not visible anymore in the session registry. */ ust_registry_channel_del_free(reg_uid->registry->reg.ust, - ua_chan->tracing_channel_id); + ua_chan->tracing_channel_id, false); buffer_reg_channel_remove(reg_uid->registry, reg_chan); buffer_reg_channel_destroy(reg_chan, LTTNG_DOMAIN_UST); goto error; @@ -2898,7 +2902,7 @@ static int create_channel_per_uid(struct ust_app *app, ua_chan->name); goto error; } - + created = true; } /* Send buffers to the application. */ @@ -2910,6 +2914,39 @@ static int create_channel_per_uid(struct ust_app *app, goto error; } + if (created) { + enum lttng_error_code cmd_ret; + struct ltt_session *session; + uint64_t chan_reg_key; + struct ust_registry_channel *chan_reg; + + chan_reg_key = ua_chan->tracing_channel_id; + pthread_mutex_lock(®_uid->registry->reg.ust->lock); + rcu_read_lock(); + chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust, chan_reg_key); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + rcu_read_unlock(); + chan_reg = NULL; + pthread_mutex_unlock(®_uid->registry->reg.ust->lock); + + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + cmd_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + if (cmd_ret != LTTNG_OK) { + ret = - (int) cmd_ret; + ERR("Failed to add channel to notification thread"); + goto error; + } + } + error: return ret; } @@ -2925,6 +2962,8 @@ static int create_channel_per_pid(struct ust_app *app, { int ret; struct ust_registry_session *registry; + enum lttng_error_code cmd_ret; + struct ltt_session *session; assert(app); assert(usess); @@ -2963,6 +3002,32 @@ static int create_channel_per_pid(struct ust_app *app, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + uint64_t chan_reg_key; + struct ust_registry_channel *chan_reg; + + chan_reg_key = ua_chan->key; + pthread_mutex_lock(®istry->lock); + chan_reg = ust_registry_channel_find(registry, chan_reg_key); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + pthread_mutex_unlock(®istry->lock); + + cmd_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + if (cmd_ret != LTTNG_OK) { + ret = - (int) cmd_ret; + ERR("Failed to add channel to notification thread"); + goto error; + } + error: rcu_read_unlock(); return ret; @@ -3075,7 +3140,6 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess, /* Only add the channel if successful on the tracer side. */ lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node); - end: if (ua_chanp) { *ua_chanp = ua_chan; diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 3bb54f039..d60ddb9ed 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -32,6 +32,8 @@ #include "ust-consumer.h" #include "buffer-registry.h" #include "session.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" /* * Return allocated full pathname of the session using the consumer trace path @@ -174,6 +176,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, ua_chan->attr.switch_timer_interval, ua_chan->attr.read_timer_interval, ua_sess->live_timer_interval, + 1000000, /* FIXME Use value provided by the client. */ output, (int) ua_chan->attr.type, ua_sess->tracing_id, @@ -223,6 +226,8 @@ error: /* * Ask consumer to create a channel for a given session. * + * Session list and rcu read side locks must be held by the caller. + * * Returns 0 on success else a negative value. */ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, @@ -230,6 +235,7 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, struct consumer_socket *socket, struct ust_registry_session *registry) { int ret; + struct ltt_session *session; assert(ua_sess); assert(ua_chan); @@ -243,10 +249,14 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + pthread_mutex_lock(socket->lock); ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry); pthread_mutex_unlock(socket->lock); if (ret < 0) { + ERR("ask_channel_creation consumer command failed"); goto error; } diff --git a/src/bin/lttng-sessiond/ust-registry.c b/src/bin/lttng-sessiond/ust-registry.c index 3c3aa91f0..88d663170 100644 --- a/src/bin/lttng-sessiond/ust-registry.c +++ b/src/bin/lttng-sessiond/ust-registry.c @@ -26,6 +26,8 @@ #include "ust-registry.h" #include "ust-app.h" #include "utils.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" /* * Hash table match function for event in the registry. @@ -695,13 +697,23 @@ void destroy_channel_rcu(struct rcu_head *head) * free the registry pointer since it might not have been allocated before so * it's the caller responsability. */ -static void destroy_channel(struct ust_registry_channel *chan) +static void destroy_channel(struct ust_registry_channel *chan, bool notif) { struct lttng_ht_iter iter; struct ust_registry_event *event; + enum lttng_error_code cmd_ret; assert(chan); + if (notif) { + cmd_ret = notification_thread_command_remove_channel( + notification_thread_handle, chan->consumer_key, + LTTNG_DOMAIN_UST); + if (cmd_ret != LTTNG_OK) { + ERR("Failed to remove channel from notification thread"); + } + } + rcu_read_lock(); /* Destroy all event associated with this registry. */ cds_lfht_for_each_entry(chan->ht->ht, &iter.iter, event, node.node) { @@ -759,7 +771,7 @@ int ust_registry_channel_add(struct ust_registry_session *session, return 0; error: - destroy_channel(chan); + destroy_channel(chan, false); error_alloc: return ret; } @@ -798,7 +810,7 @@ end: * Remove channel using key from registry and free memory. */ void ust_registry_channel_del_free(struct ust_registry_session *session, - uint64_t key) + uint64_t key, bool notif) { struct lttng_ht_iter iter; struct ust_registry_channel *chan; @@ -817,7 +829,7 @@ void ust_registry_channel_del_free(struct ust_registry_session *session, ret = lttng_ht_del(session->channels, &iter); assert(!ret); rcu_read_unlock(); - destroy_channel(chan); + destroy_channel(chan, notif); end: return; @@ -972,7 +984,7 @@ void ust_registry_session_destroy(struct ust_registry_session *reg) /* Delete the node from the ht and free it. */ ret = lttng_ht_del(reg->channels, &iter); assert(!ret); - destroy_channel(chan); + destroy_channel(chan, true); } rcu_read_unlock(); ht_cleanup_push(reg->channels); diff --git a/src/bin/lttng-sessiond/ust-registry.h b/src/bin/lttng-sessiond/ust-registry.h index 2697cf2ec..47b21dfe0 100644 --- a/src/bin/lttng-sessiond/ust-registry.h +++ b/src/bin/lttng-sessiond/ust-registry.h @@ -109,6 +109,7 @@ struct ust_registry_session { struct ust_registry_channel { uint64_t key; + uint64_t consumer_key; /* Id set when replying to a register channel. */ uint32_t chan_id; enum ustctl_channel_header header_type; @@ -248,7 +249,7 @@ struct ust_registry_channel *ust_registry_channel_find( int ust_registry_channel_add(struct ust_registry_session *session, uint64_t key); void ust_registry_channel_del_free(struct ust_registry_session *session, - uint64_t key); + uint64_t key, bool notif); int ust_registry_session_init(struct ust_registry_session **sessionp, struct ust_app *app, diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 8619b5e25..f6df9a61a 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -73,7 +73,10 @@ libcommon_la_SOURCES = error.h error.c utils.c utils.h runas.c runas.h \ mi-lttng.h mi-lttng.c \ daemonize.c daemonize.h \ unix.c unix.h \ - filter.c filter.h context.c context.h + filter.c filter.h context.c context.h \ + action.c notify.c condition.c buffer-usage.c \ + evaluation.c notification.c trigger.c endpoint.c \ + dynamic-buffer.h dynamic-buffer.c libcommon_la_LIBADD = \ $(top_builddir)/src/common/config/libconfig.la diff --git a/src/common/action.c b/src/common/action.c new file mode 100644 index 000000000..d07f36c2d --- /dev/null +++ b/src/common/action.c @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include + +enum lttng_action_type lttng_action_get_type(struct lttng_action *action) +{ + return action ? action->type : LTTNG_ACTION_TYPE_UNKNOWN; +} + +void lttng_action_destroy(struct lttng_action *action) +{ + if (!action) { + return; + } + + assert(action->destroy); + action->destroy(action); +} + +LTTNG_HIDDEN +bool lttng_action_validate(struct lttng_action *action) +{ + bool valid; + + if (!action) { + valid = false; + goto end; + } + + if (!action->validate) { + /* Sub-class guarantees that it can never be invalid. */ + valid = true; + goto end; + } + + valid = action->validate(action); +end: + return valid; +} + +LTTNG_HIDDEN +ssize_t lttng_action_serialize(struct lttng_action *action, char *buf) +{ + ssize_t ret, action_size; + struct lttng_action_comm action_comm; + + if (!action) { + ret = -1; + goto end; + } + + action_comm.action_type = (int8_t) action->type; + ret = sizeof(struct lttng_action_comm); + if (buf) { + memcpy(buf, &action_comm, ret); + buf += ret; + } + + action_size = action->serialize(action, buf); + if (action_size < 0) { + ret = action_size; + goto end; + } + ret += action_size; +end: + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_action_create_from_buffer(const char *buf, + struct lttng_action **_action) +{ + ssize_t ret, action_size = sizeof(struct lttng_action_comm); + struct lttng_action *action; + struct lttng_action_comm *action_comm = + (struct lttng_action_comm *) buf; + + if (!buf || !_action) { + ret = -1; + goto end; + } + + DBG("Deserializing action from buffer"); + switch (action_comm->action_type) { + case LTTNG_ACTION_TYPE_NOTIFY: + action = lttng_action_notify_create(); + break; + default: + ret = -1; + goto end; + } + + if (!action) { + ret = -1; + goto end; + } + ret = action_size; + *_action = action; +end: + return ret; +} diff --git a/src/common/buffer-usage.c b/src/common/buffer-usage.c new file mode 100644 index 000000000..91db15b13 --- /dev/null +++ b/src/common/buffer-usage.c @@ -0,0 +1,784 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +static +double fixed_to_double(uint32_t val) +{ + return (double) val / (double) UINT32_MAX; +} + +static +uint64_t double_to_fixed(double val) +{ + return (val * (double) UINT32_MAX); +} + +static +bool is_usage_condition(struct lttng_condition *condition) +{ + enum lttng_condition_type type = lttng_condition_get_type(condition); + + return type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW || + type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH; +} + +static +bool is_usage_evaluation(struct lttng_evaluation *evaluation) +{ + enum lttng_condition_type type = lttng_evaluation_get_type(evaluation); + + return type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW || + type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH; +} + +static +void lttng_condition_buffer_usage_destroy(struct lttng_condition *condition) +{ + struct lttng_condition_buffer_usage *usage; + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + + free(usage->session_name); + free(usage->channel_name); + free(usage); +} + +static +bool lttng_condition_buffer_usage_validate(struct lttng_condition *condition) +{ + bool valid = false; + struct lttng_condition_buffer_usage *usage; + + if (!condition) { + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->session_name) { + ERR("Invalid buffer condition: a target session name must be set."); + goto end; + } + if (!usage->channel_name) { + ERR("Invalid buffer condition: a target channel name must be set."); + goto end; + } + if (!usage->threshold_ratio.set && !usage->threshold_bytes.set) { + ERR("Invalid buffer condition: a threshold must be set."); + goto end; + } + + valid = true; +end: + return valid; +} + +static +ssize_t lttng_condition_buffer_usage_serialize(struct lttng_condition *condition, + char *buf) +{ + struct lttng_condition_buffer_usage *usage; + ssize_t ret, size; + size_t session_name_len, channel_name_len; + + if (!condition || !is_usage_condition(condition)) { + ret = -1; + goto end; + } + + DBG("Serializing buffer usage condition"); + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + size = sizeof(struct lttng_condition_buffer_usage_comm); + session_name_len = strlen(usage->session_name) + 1; + channel_name_len = strlen(usage->channel_name) + 1; + size += session_name_len + channel_name_len; + if (buf) { + struct lttng_condition_buffer_usage_comm usage_comm = { + .threshold_set_in_bytes = usage->threshold_bytes.set ? 1 : 0, + .session_name_len = session_name_len, + .channel_name_len = channel_name_len, + .domain_type = (int8_t) usage->domain.type, + }; + + if (usage->threshold_bytes.set) { + usage_comm.threshold = usage->threshold_bytes.value; + } else { + uint64_t val = double_to_fixed( + usage->threshold_ratio.value); + + if (val > UINT32_MAX) { + /* overflow. */ + ret = -1; + goto end; + } + usage_comm.threshold = val; + } + + memcpy(buf, &usage_comm, sizeof(usage_comm)); + buf += sizeof(usage_comm); + memcpy(buf, usage->session_name, session_name_len); + buf += session_name_len; + memcpy(buf, usage->channel_name, channel_name_len); + buf += channel_name_len; + } + ret = size; +end: + return ret; +} + +static +bool lttng_condition_buffer_usage_is_equal(struct lttng_condition *_a, + struct lttng_condition *_b) +{ + bool is_equal = false; + struct lttng_condition_buffer_usage *a, *b; + + a = container_of(_a, struct lttng_condition_buffer_usage, parent); + b = container_of(_b, struct lttng_condition_buffer_usage, parent); + + if ((a->threshold_ratio.set && !b->threshold_ratio.set) || + (a->threshold_bytes.set && !b->threshold_bytes.set)) { + goto end; + } + + if (a->threshold_ratio.set && b->threshold_ratio.set) { + double a_value, b_value, diff; + + a_value = a->threshold_ratio.value; + b_value = b->threshold_ratio.value; + diff = fabs(a_value - b_value); + + if (diff > DBL_EPSILON) { + goto end; + } + } else if (a->threshold_bytes.set && b->threshold_bytes.set) { + uint64_t a_value, b_value; + + a_value = a->threshold_bytes.value; + b_value = b->threshold_bytes.value; + if (a_value != b_value) { + goto end; + } + } + + if ((a->session_name && !b->session_name) || + (!a->session_name && b->session_name)) { + goto end; + } + + if (a->channel_name && b->channel_name) { + if (strcmp(a->channel_name, b->channel_name)) { + goto end; + } + } if ((a->channel_name && !b->channel_name) || + (!a->channel_name && b->channel_name)) { + goto end; + } + + if (a->channel_name && b->channel_name) { + if (strcmp(a->channel_name, b->channel_name)) { + goto end; + } + } + + if ((a->domain.set && !b->domain.set) || + (!a->domain.set && b->domain.set)) { + goto end; + } + + if (a->domain.set && b->domain.set) { + if (a->domain.type != b->domain.type) { + goto end; + } + } + is_equal = true; +end: + return is_equal; +} + +static +struct lttng_condition *lttng_condition_buffer_usage_create( + enum lttng_condition_type type) +{ + struct lttng_condition_buffer_usage *condition; + + condition = zmalloc(sizeof(struct lttng_condition_buffer_usage)); + if (!condition) { + goto end; + } + + lttng_condition_init(&condition->parent, type); + condition->parent.validate = lttng_condition_buffer_usage_validate; + condition->parent.serialize = lttng_condition_buffer_usage_serialize; + condition->parent.equal = lttng_condition_buffer_usage_is_equal; + condition->parent.destroy = lttng_condition_buffer_usage_destroy; +end: + return &condition->parent; +} + +struct lttng_condition *lttng_condition_buffer_usage_low_create(void) +{ + return lttng_condition_buffer_usage_create( + LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW); +} + +struct lttng_condition *lttng_condition_buffer_usage_high_create(void) +{ + return lttng_condition_buffer_usage_create( + LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH); +} + +static +ssize_t init_condition_from_buffer(struct lttng_condition *condition, + const char *buf) +{ + ssize_t ret, condition_size; + enum lttng_condition_status status; + enum lttng_domain_type domain_type; + struct lttng_condition_buffer_usage_comm *condition_comm = + (struct lttng_condition_buffer_usage_comm *) buf; + const char *session_name, *channel_name; + + if (condition_comm->threshold_set_in_bytes) { + status = lttng_condition_buffer_usage_set_threshold(condition, + condition_comm->threshold); + } else { + status = lttng_condition_buffer_usage_set_threshold_ratio( + condition, + fixed_to_double(condition_comm->threshold)); + } + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to initialize buffer usage condition threshold"); + ret = -1; + goto end; + } + + if (condition_comm->domain_type <= LTTNG_DOMAIN_NONE || + condition_comm->domain_type > LTTNG_DOMAIN_PYTHON) { + /* Invalid domain value. */ + ERR("Invalid domain type value (%i) found in condition buffer", + (int) condition_comm->domain_type); + ret = -1; + goto end; + } + + domain_type = (enum lttng_domain_type) condition_comm->domain_type; + status = lttng_condition_buffer_usage_set_domain_type(condition, + domain_type); + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to set buffer usage condition domain"); + ret = -1; + goto end; + } + + session_name = buf + sizeof(struct lttng_condition_buffer_usage_comm); + channel_name = session_name + condition_comm->session_name_len; + + status = lttng_condition_buffer_usage_set_session_name(condition, + session_name); + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to set buffer usage session name"); + ret = -1; + goto end; + } + + status = lttng_condition_buffer_usage_set_channel_name(condition, + channel_name); + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to set buffer usage channel name"); + ret = -1; + goto end; + } + + if (!lttng_condition_validate(condition)) { + ret = -1; + goto end; + } + + condition_size = sizeof(*condition_comm); + condition_size += (ssize_t) condition_comm->session_name_len; + condition_size += (ssize_t) condition_comm->channel_name_len; + ret = condition_size; +end: + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_buffer_usage_low_create_from_buffer(const char *buf, + struct lttng_condition **_condition) +{ + ssize_t ret; + struct lttng_condition *condition = + lttng_condition_buffer_usage_low_create(); + + if (!_condition || !condition) { + ret = -1; + goto error; + } + + ret = init_condition_from_buffer(condition, buf); + if (ret < 0) { + goto error; + } + + *_condition = condition; + return ret; +error: + lttng_condition_destroy(condition); + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_buffer_usage_high_create_from_buffer(const char *buf, + struct lttng_condition **_condition) +{ + ssize_t ret; + struct lttng_condition *condition = + lttng_condition_buffer_usage_high_create(); + + if (!_condition || !condition) { + ret = -1; + goto error; + } + + ret = init_condition_from_buffer(condition, buf); + if (ret < 0) { + goto error; + } + + *_condition = condition; + return ret; +error: + lttng_condition_destroy(condition); + return ret; +} + +static +struct lttng_evaluation *create_evaluation_from_buffer( + enum lttng_condition_type type, const char *buf) +{ + struct lttng_evaluation_buffer_usage_comm *comm = + (struct lttng_evaluation_buffer_usage_comm *) buf; + struct lttng_evaluation *evaluation; + + evaluation = lttng_evaluation_buffer_usage_create(type, + comm->buffer_use, comm->buffer_capacity); + return evaluation; +} + +LTTNG_HIDDEN +ssize_t lttng_evaluation_buffer_usage_low_create_from_buffer(const char *buf, + struct lttng_evaluation **_evaluation) +{ + ssize_t ret; + struct lttng_evaluation *evaluation = NULL; + + if (!_evaluation) { + ret = -1; + goto error; + } + + evaluation = create_evaluation_from_buffer( + LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW, buf); + if (!evaluation) { + ret = -1; + goto error; + } + + *_evaluation = evaluation; + ret = sizeof(struct lttng_evaluation_buffer_usage_comm); + return ret; +error: + lttng_evaluation_destroy(evaluation); + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_evaluation_buffer_usage_high_create_from_buffer(const char *buf, + struct lttng_evaluation **_evaluation) +{ + ssize_t ret; + struct lttng_evaluation *evaluation = NULL; + + if (!_evaluation) { + ret = -1; + goto error; + } + + evaluation = create_evaluation_from_buffer( + LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH, buf); + if (!evaluation) { + ret = -1; + goto error; + } + + *_evaluation = evaluation; + ret = sizeof(struct lttng_evaluation_buffer_usage_comm); + return ret; +error: + lttng_evaluation_destroy(evaluation); + return ret; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_threshold_ratio( + struct lttng_condition *condition, double *threshold_ratio) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || + !threshold_ratio) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->threshold_ratio.set) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *threshold_ratio = usage->threshold_ratio.value; +end: + return status; +} + +/* threshold_ratio expressed as [0.0, 1.0]. */ +enum lttng_condition_status +lttng_condition_buffer_usage_set_threshold_ratio( + struct lttng_condition *condition, double threshold_ratio) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || + threshold_ratio < 0.0 || + threshold_ratio > 1.0) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + usage->threshold_ratio.set = true; + usage->threshold_bytes.set = false; + usage->threshold_ratio.value = threshold_ratio; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_threshold( + struct lttng_condition *condition, uint64_t *threshold_bytes) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !threshold_bytes) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->threshold_bytes.set) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *threshold_bytes = usage->threshold_bytes.value; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_set_threshold( + struct lttng_condition *condition, uint64_t threshold_bytes) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition)) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + usage->threshold_ratio.set = false; + usage->threshold_bytes.set = true; + usage->threshold_bytes.value = threshold_bytes; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_session_name( + struct lttng_condition *condition, const char **session_name) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !session_name) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->session_name) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *session_name = usage->session_name; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_set_session_name( + struct lttng_condition *condition, const char *session_name) +{ + char *session_name_copy; + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !session_name || + strlen(session_name) == 0) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + session_name_copy = strdup(session_name); + if (!session_name_copy) { + status = LTTNG_CONDITION_STATUS_ERROR; + goto end; + } + + if (usage->session_name) { + free(usage->session_name); + } + usage->session_name = session_name_copy; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_channel_name( + struct lttng_condition *condition, const char **channel_name) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !channel_name) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->channel_name) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *channel_name = usage->channel_name; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_set_channel_name( + struct lttng_condition *condition, const char *channel_name) +{ + char *channel_name_copy; + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !channel_name || + strlen(channel_name) == 0) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + channel_name_copy = strdup(channel_name); + if (!channel_name_copy) { + status = LTTNG_CONDITION_STATUS_ERROR; + goto end; + } + + if (usage->channel_name) { + free(usage->channel_name); + } + usage->channel_name = channel_name_copy; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_domain_type( + struct lttng_condition *condition, enum lttng_domain_type *type) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !type) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->domain.set) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *type = usage->domain.type; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_set_domain_type( + struct lttng_condition *condition, enum lttng_domain_type type) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || + type == LTTNG_DOMAIN_NONE) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + usage->domain.set = true; + usage->domain.type = type; +end: + return status; +} + +static +ssize_t lttng_evaluation_buffer_usage_serialize( + struct lttng_evaluation *evaluation, char *buf) +{ + ssize_t ret; + struct lttng_evaluation_buffer_usage *usage; + + usage = container_of(evaluation, struct lttng_evaluation_buffer_usage, + parent); + if (buf) { + struct lttng_evaluation_buffer_usage_comm comm = { + .buffer_use = usage->buffer_use, + .buffer_capacity = usage->buffer_capacity, + }; + + memcpy(buf, &comm, sizeof(comm)); + } + + ret = sizeof(struct lttng_evaluation_buffer_usage_comm); + return ret; +} + +static +void lttng_evaluation_buffer_usage_destroy( + struct lttng_evaluation *evaluation) +{ + struct lttng_evaluation_buffer_usage *usage; + + usage = container_of(evaluation, struct lttng_evaluation_buffer_usage, + parent); + free(usage); +} + +LTTNG_HIDDEN +struct lttng_evaluation *lttng_evaluation_buffer_usage_create( + enum lttng_condition_type type, uint64_t use, uint64_t capacity) +{ + struct lttng_evaluation_buffer_usage *usage; + + usage = zmalloc(sizeof(struct lttng_evaluation_buffer_usage)); + if (!usage) { + goto end; + } + + usage->parent.type = type; + usage->buffer_use = use; + usage->buffer_capacity = capacity; + usage->parent.serialize = lttng_evaluation_buffer_usage_serialize; + usage->parent.destroy = lttng_evaluation_buffer_usage_destroy; +end: + return &usage->parent; +} + +/* + * Get the sampled buffer usage which caused the associated condition to + * evaluate to "true". + */ +enum lttng_evaluation_status +lttng_evaluation_buffer_usage_get_usage_ratio( + struct lttng_evaluation *evaluation, double *usage_ratio) +{ + struct lttng_evaluation_buffer_usage *usage; + enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK; + + if (!evaluation || !is_usage_evaluation(evaluation) || !usage_ratio) { + status = LTTNG_EVALUATION_STATUS_INVALID; + goto end; + } + + usage = container_of(evaluation, struct lttng_evaluation_buffer_usage, + parent); + *usage_ratio = (double) usage->buffer_use / + (double) usage->buffer_capacity; +end: + return status; +} + +enum lttng_evaluation_status +lttng_evaluation_buffer_usage_get_usage(struct lttng_evaluation *evaluation, + uint64_t *usage_bytes) +{ + struct lttng_evaluation_buffer_usage *usage; + enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK; + + if (!evaluation || !is_usage_evaluation(evaluation) || !usage_bytes) { + status = LTTNG_EVALUATION_STATUS_INVALID; + goto end; + } + + usage = container_of(evaluation, struct lttng_evaluation_buffer_usage, + parent); + *usage_bytes = usage->buffer_use; +end: + return status; +} diff --git a/src/common/condition.c b/src/common/condition.c new file mode 100644 index 000000000..2f7f41364 --- /dev/null +++ b/src/common/condition.c @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include + +enum lttng_condition_type lttng_condition_get_type( + struct lttng_condition *condition) +{ + return condition ? condition->type : LTTNG_CONDITION_TYPE_UNKNOWN; +} + +void lttng_condition_destroy(struct lttng_condition *condition) +{ + if (!condition) { + return; + } + + assert(condition->destroy); + condition->destroy(condition); +} + +LTTNG_HIDDEN +bool lttng_condition_validate(struct lttng_condition *condition) +{ + bool valid; + + if (!condition) { + valid = false; + goto end; + } + + if (!condition->validate) { + /* Sub-class guarantees that it can never be invalid. */ + valid = true; + goto end; + } + + valid = condition->validate(condition); +end: + return valid; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_serialize(struct lttng_condition *condition, char *buf) +{ + ssize_t ret, condition_size; + struct lttng_condition_comm condition_comm; + + if (!condition) { + ret = -1; + goto end; + } + + condition_comm.condition_type = (int8_t) condition->type; + ret = sizeof(struct lttng_condition_comm); + if (buf) { + memcpy(buf, &condition_comm, ret); + buf += ret; + } + + condition_size = condition->serialize(condition, buf); + if (condition_size < 0) { + ret = condition_size; + goto end; + } + ret += condition_size; +end: + return ret; +} + +LTTNG_HIDDEN +bool lttng_condition_is_equal(struct lttng_condition *a, + struct lttng_condition *b) +{ + bool is_equal = false; + + if (!a || !b) { + goto end; + } + + if (a->type != b->type) { + goto end; + } + + is_equal = a->equal ? a->equal(a, b) : true; +end: + return is_equal; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_create_from_buffer(const char *buf, + struct lttng_condition **condition) +{ + ssize_t ret, condition_size = 0; + struct lttng_condition_comm *condition_comm = + (struct lttng_condition_comm *) buf; + + if (!buf || !condition) { + ret = -1; + goto end; + } + + DBG("Deserializing condition from buffer"); + condition_size += sizeof(*condition_comm); + buf += condition_size; + + switch ((enum lttng_condition_type) condition_comm->condition_type) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + ret = lttng_condition_buffer_usage_low_create_from_buffer(buf, + condition); + if (ret < 0) { + goto end; + } + condition_size += ret; + break; + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + ret = lttng_condition_buffer_usage_high_create_from_buffer(buf, + condition); + if (ret < 0) { + goto end; + } + condition_size += ret; + break; + default: + ERR("Attempted to create condition of unknown type (%i)", + (int) condition_comm->condition_type); + ret = -1; + goto end; + } + ret = condition_size; +end: + return ret; +} + +LTTNG_HIDDEN +void lttng_condition_init(struct lttng_condition *condition, + enum lttng_condition_type type) +{ + condition->type = type; +} diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index b0a434284..3234e8514 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -61,8 +62,14 @@ static void setmask(sigset_t *mask) if (ret) { PERROR("sigaddset live"); } + ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR); + if (ret) { + PERROR("sigaddset monitor"); + } } +static int channel_monitor_pipe = -1; + /* * Execute action on a timer switch. * @@ -71,7 +78,7 @@ static void setmask(sigset_t *mask) * deadlocks. */ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si, void *uc) + int sig, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -304,7 +311,7 @@ end: * Execute action on a live timer */ static void live_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si, void *uc) + int sig, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -411,44 +418,95 @@ void consumer_timer_signal_thread_qs(unsigned int signr) } /* - * Set the timer for periodical metadata flush. + * Start a timer channel timer which will fire at a given interval + * (timer_interval_us)and fire a given signal (signal). + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). */ -void consumer_timer_switch_start(struct lttng_consumer_channel *channel, - unsigned int switch_timer_interval) +static +int consumer_channel_timer_start(timer_t *timer_id, + struct lttng_consumer_channel *channel, + unsigned int timer_interval_us, int signal) { - int ret; + int ret = 0, delete_ret; struct sigevent sev; struct itimerspec its; assert(channel); assert(channel->key); - if (switch_timer_interval == 0) { - return; + if (timer_interval_us == 0) { + /* No creation needed; not an error. */ + ret = 1; + goto end; } sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH; + sev.sigev_signo = signal; sev.sigev_value.sival_ptr = channel; - ret = timer_create(CLOCKID, &sev, &channel->switch_timer); + ret = timer_create(CLOCKID, &sev, timer_id); if (ret == -1) { PERROR("timer_create"); + goto end; } - channel->switch_timer_enabled = 1; - its.it_value.tv_sec = switch_timer_interval / 1000000; - its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000; + its.it_value.tv_sec = timer_interval_us / 1000000; + its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000; its.it_interval.tv_sec = its.it_value.tv_sec; its.it_interval.tv_nsec = its.it_value.tv_nsec; - ret = timer_settime(channel->switch_timer, 0, &its, NULL); + ret = timer_settime(*timer_id, 0, &its, NULL); if (ret == -1) { PERROR("timer_settime"); + goto error_destroy_timer; + } +end: + return ret; +error_destroy_timer: + delete_ret = timer_delete(*timer_id); + if (delete_ret == -1) { + PERROR("timer_delete"); + } + goto end; +} + +static +int consumer_channel_timer_stop(timer_t *timer_id, int signal) +{ + int ret = 0; + + ret = timer_delete(*timer_id); + if (ret == -1) { + PERROR("timer_delete"); + goto end; } + + consumer_timer_signal_thread_qs(signal); + *timer_id = 0; +end: + return ret; +} + +/* + * Set the channel's switch timer. + */ +void consumer_timer_switch_start(struct lttng_consumer_channel *channel, + unsigned int switch_timer_interval_us) +{ + int ret; + + assert(channel); + assert(channel->key); + + ret = consumer_channel_timer_start(&channel->switch_timer, channel, + switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH); + + channel->switch_timer_enabled = !!(ret == 0); } /* - * Stop and delete timer. + * Stop and delete the channel's switch timer. */ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) { @@ -456,72 +514,91 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) assert(channel); - ret = timer_delete(channel->switch_timer); + ret = consumer_channel_timer_stop(&channel->switch_timer, + LTTNG_CONSUMER_SIG_SWITCH); if (ret == -1) { - PERROR("timer_delete"); + ERR("Failed to stop switch timer"); } - consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH); - - channel->switch_timer = 0; channel->switch_timer_enabled = 0; } /* - * Set the timer for the live mode. + * Set the channel's live timer. */ void consumer_timer_live_start(struct lttng_consumer_channel *channel, - int live_timer_interval) + unsigned int live_timer_interval_us) { int ret; - struct sigevent sev; - struct itimerspec its; assert(channel); assert(channel->key); - if (live_timer_interval <= 0) { - return; - } + ret = consumer_channel_timer_start(&channel->live_timer, channel, + live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE); - sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE; - sev.sigev_value.sival_ptr = channel; - ret = timer_create(CLOCKID, &sev, &channel->live_timer); - if (ret == -1) { - PERROR("timer_create"); - } - channel->live_timer_enabled = 1; + channel->live_timer_enabled = !!(ret == 0); +} - its.it_value.tv_sec = live_timer_interval / 1000000; - its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000; - its.it_interval.tv_sec = its.it_value.tv_sec; - its.it_interval.tv_nsec = its.it_value.tv_nsec; +/* + * Stop and delete the channel's live timer. + */ +void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +{ + int ret; - ret = timer_settime(channel->live_timer, 0, &its, NULL); + assert(channel); + + ret = consumer_channel_timer_stop(&channel->live_timer, + LTTNG_CONSUMER_SIG_LIVE); if (ret == -1) { - PERROR("timer_settime"); + ERR("Failed to stop live timer"); } + + channel->live_timer_enabled = 0; } /* - * Stop and delete timer. + * Set the channel's monitoring timer. + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). */ -void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, + unsigned int monitor_timer_interval_us) { int ret; assert(channel); + assert(channel->key); + assert(!channel->monitor_timer_enabled); - ret = timer_delete(channel->live_timer); + ret = consumer_channel_timer_start(&channel->monitor_timer, channel, + monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR); + channel->monitor_timer_enabled = !!(ret == 0); + return ret; +} + +/* + * Stop and delete the channel's monitoring timer. + */ +int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel) +{ + int ret; + + assert(channel); + assert(channel->monitor_timer_enabled); + + ret = consumer_channel_timer_stop(&channel->monitor_timer, + LTTNG_CONSUMER_SIG_MONITOR); if (ret == -1) { - PERROR("timer_delete"); + ERR("Failed to stop live timer"); + goto end; } - consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE); - - channel->live_timer = 0; - channel->live_timer_enabled = 0; + channel->monitor_timer_enabled = 0; +end: + return ret; } /* @@ -544,9 +621,159 @@ int consumer_signal_init(void) return 0; } +static +int sample_ust_positions(struct lttng_consumer_channel *channel, + uint64_t *_highest_use, uint64_t *_lowest_use) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + bool empty_channel = true; + uint64_t high = 0, low = UINT64_MAX; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, + &iter.iter, stream, node_channel_id.node) { + unsigned long produced, consumed, usage; + + empty_channel = false; + + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + + ret = ustctl_snapshot_sample_positions(stream->ustream); + if (ret) { + ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret); + pthread_mutex_unlock(&stream->lock); + goto end; + } + ret = ustctl_snapshot_get_consumed(stream->ustream, + &consumed); + if (ret) { + ERR("Failed to get buffer consumed position in monitor timer"); + pthread_mutex_unlock(&stream->lock); + goto end; + } + ret = ustctl_snapshot_get_produced(stream->ustream, + &produced); + if (ret) { + ERR("Failed to get buffer produced position in monitor timer"); + pthread_mutex_unlock(&stream->lock); + goto end; + } + + usage = produced - consumed; + high = (usage > high) ? usage : high; + low = (usage < low) ? usage : low; + next: + pthread_mutex_unlock(&stream->lock); + } + + *_highest_use = high; + *_lowest_use = low; +end: + rcu_read_unlock(); + if (empty_channel) { + ret = -1; + } + return ret; +} + +/* + * Execute action on a monitor timer. + */ +static +void monitor_timer(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_channel *channel) +{ + int ret; + int channel_monitor_pipe = + consumer_timer_thread_get_channel_monitor_pipe(); + struct lttcomm_consumer_channel_monitor_msg msg = { + .key = channel->key, + }; + + assert(channel); + pthread_mutex_lock(&consumer_data.lock); + + if (channel_monitor_pipe < 0) { + goto end; + } + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* TODO */ + ret = -1; + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + { + ret = sample_ust_positions(channel, &msg.highest, &msg.lowest); + break; + } + default: + abort(); + } + + if (ret) { + goto end; + } + + /* + * Writes performed here are assumed to be atomic which is only + * guaranteed for sizes < than PIPE_BUF. + */ + assert(sizeof(msg) <= PIPE_BUF); + + do { + ret = write(channel_monitor_pipe, &msg, sizeof(msg)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + if (errno == EAGAIN) { + /* Not an error, the sample is merely dropped. */ + DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64, + channel->key); + } else { + PERROR("write to the channel monitor pipe"); + } + } else { + DBG("Sent channel monitoring sample for channel key %" PRIu64 + ", (highest = %" PRIu64 ", lowest = %"PRIu64")", + channel->key, msg.highest, msg.lowest); + } +end: + pthread_mutex_unlock(&consumer_data.lock); +} + +int consumer_timer_thread_get_channel_monitor_pipe(void) +{ + return uatomic_read(&channel_monitor_pipe); +} + +int consumer_timer_thread_set_channel_monitor_pipe(int fd) +{ + int ret; + + ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd); + if (ret != -1) { + ret = -1; + goto end; + } + ret = 0; +end: + return ret; +} + /* * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH, - * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE. + * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and + * LTTNG_CONSUMER_SIG_MONITOR. */ void *consumer_timer_thread(void *data) { @@ -575,20 +802,31 @@ void *consumer_timer_thread(void *data) health_poll_entry(); signr = sigwaitinfo(&mask, &info); health_poll_exit(); + + /* + * NOTE: cascading conditions are used instead of a switch case + * since the use of SIGRTMIN in the definition of the signals' + * values prevents the reduction to an integer constant. + */ if (signr == -1) { if (errno != EINTR) { PERROR("sigwaitinfo"); } continue; } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) { - metadata_switch_timer(ctx, info.si_signo, &info, NULL); + metadata_switch_timer(ctx, info.si_signo, &info); } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) { cmm_smp_mb(); CMM_STORE_SHARED(timer_signal.qs_done, 1); cmm_smp_mb(); DBG("Signal timer metadata thread teardown"); } else if (signr == LTTNG_CONSUMER_SIG_LIVE) { - live_timer(ctx, info.si_signo, &info, NULL); + live_timer(ctx, info.si_signo, &info); + } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) { + struct lttng_consumer_channel *channel; + + channel = info.si_value.sival_ptr; + monitor_timer(ctx, channel); } else { ERR("Unexpected signal %d\n", info.si_signo); } diff --git a/src/common/consumer/consumer-timer.h b/src/common/consumer/consumer-timer.h index 22e74574c..851a172aa 100644 --- a/src/common/consumer/consumer-timer.h +++ b/src/common/consumer/consumer-timer.h @@ -27,6 +27,7 @@ #define LTTNG_CONSUMER_SIG_SWITCH SIGRTMIN + 10 #define LTTNG_CONSUMER_SIG_TEARDOWN SIGRTMIN + 11 #define LTTNG_CONSUMER_SIG_LIVE SIGRTMIN + 12 +#define LTTNG_CONSUMER_SIG_MONITOR SIGRTMIN + 13 #define CLOCKID CLOCK_MONOTONIC @@ -44,15 +45,21 @@ struct timer_signal_data { }; void consumer_timer_switch_start(struct lttng_consumer_channel *channel, - unsigned int switch_timer_interval); + unsigned int switch_timer_interval_us); void consumer_timer_switch_stop(struct lttng_consumer_channel *channel); void consumer_timer_live_start(struct lttng_consumer_channel *channel, - int live_timer_interval); + unsigned int live_timer_interval_us); void consumer_timer_live_stop(struct lttng_consumer_channel *channel); +int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, + unsigned int monitor_timer_interval_us); +int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel); void *consumer_timer_thread(void *data); int consumer_signal_init(void); int consumer_flush_kernel_index(struct lttng_consumer_stream *stream); int consumer_flush_ust_index(struct lttng_consumer_stream *stream); +int consumer_timer_thread_get_channel_monitor_pipe(void); +int consumer_timer_thread_set_channel_monitor_pipe(int fd); + #endif /* CONSUMER_TIMER_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index e379171ae..3b857dec3 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -368,6 +368,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); } + if (channel->monitor_timer_enabled == 1) { + consumer_timer_monitor_stop(channel); + } switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -1348,6 +1351,8 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_metadata_pipe; } + ctx->channel_monitor_pipe = -1; + return ctx; error_metadata_pipe: diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 37adecbfe..883fd3532 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -61,6 +61,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_DISCARDED_EVENTS, LTTNG_CONSUMER_LOST_PACKETS, LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL, + LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, }; /* State of each fd in consumer */ @@ -160,6 +161,7 @@ struct lttng_consumer_channel { /* Metadata cache is metadata channel */ struct consumer_metadata_cache *metadata_cache; + /* For UST metadata periodical flush */ int switch_timer_enabled; timer_t switch_timer; @@ -170,6 +172,10 @@ struct lttng_consumer_channel { timer_t live_timer; int live_timer_error; + /* For channel monitoring timer. */ + int monitor_timer_enabled; + timer_t monitor_timer; + /* On-disk circular buffer */ uint64_t tracefile_size; uint64_t tracefile_count; @@ -539,6 +545,11 @@ struct lttng_consumer_local_data { int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ struct lttng_pipe *consumer_metadata_pipe; + /* + * Pipe used by the channel monitoring timers to provide state samples + * to the session daemon (write-only). + */ + int channel_monitor_pipe; }; /* diff --git a/src/common/defaults.h b/src/common/defaults.h index 37d222a78..4b27fb320 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -106,10 +106,12 @@ #define DEFAULT_LTTNG_EXTRA_KMOD_PROBES "LTTNG_EXTRA_KMOD_PROBES" /* Default unix socket path */ -#define DEFAULT_GLOBAL_CLIENT_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/client-lttng-sessiond" -#define DEFAULT_HOME_CLIENT_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/client-lttng-sessiond" -#define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/sessiond-health" -#define DEFAULT_HOME_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-health" +#define DEFAULT_GLOBAL_CLIENT_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/client-lttng-sessiond" +#define DEFAULT_HOME_CLIENT_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/client-lttng-sessiond" +#define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/sessiond-health" +#define DEFAULT_HOME_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-health" +#define DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/sessiond-notification" +#define DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-notification" /* Default consumer health unix socket path */ #define DEFAULT_GLOBAL_USTCONSUMER32_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/ustconsumerd32/health" diff --git a/src/common/dynamic-buffer.c b/src/common/dynamic-buffer.c new file mode 100644 index 000000000..d617b118e --- /dev/null +++ b/src/common/dynamic-buffer.c @@ -0,0 +1,166 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include + +static +size_t round_to_power_of_2(size_t val) +{ + int order; + size_t rounded; + + order = utils_get_count_order_u64(val); + assert(order >= 0); + rounded = (1ULL << order); + assert(rounded >= val); + + return rounded; +} + +void lttng_dynamic_buffer_init(struct lttng_dynamic_buffer *buffer) +{ + assert(buffer); + memset(buffer, 0, sizeof(*buffer)); +} + +int lttng_dynamic_buffer_append(struct lttng_dynamic_buffer *buffer, + const void *buf, size_t len) +{ + int ret = 0; + + if (!buffer || (!buf && len)) { + ret = -1; + goto end; + } + + if (len == 0) { + /* Not an error, no-op. */ + goto end; + } + + if ((buffer->capacity - buffer->size) < len) { + ret = lttng_dynamic_buffer_set_capacity(buffer, + buffer->capacity + + (len - (buffer->capacity - buffer->size))); + if (ret) { + goto end; + } + } + + memcpy(buffer->data + buffer->size, buf, len); + buffer->size += len; +end: + return ret; +} + +int lttng_dynamic_buffer_set_size(struct lttng_dynamic_buffer *buffer, + size_t new_size) +{ + int ret = 0; + + if (!buffer) { + goto end; + } + + if (new_size == buffer->size) { + goto end; + } + + if (new_size > buffer->capacity) { + ret = lttng_dynamic_buffer_set_capacity(buffer, new_size); + if (ret) { + goto end; + } + } else if (new_size > buffer->size) { + memset(buffer->data + buffer->size, 0, new_size - buffer->size); + buffer->size = new_size; + } else { + /* + * Shrinking size. There is no need to zero-out the newly + * released memory as it will either be: + * - overwritten by lttng_dynamic_buffer_append, + * - expanded later, which will zero-out the memory + * + * Users of external APIs are encouraged to set the buffer's + * size _before_ making such calls. + */ + } + buffer->size = new_size; +end: + return ret; +} + +int lttng_dynamic_buffer_set_capacity(struct lttng_dynamic_buffer *buffer, + size_t new_capacity) +{ + int ret = 0; + size_t rounded_capacity = round_to_power_of_2(new_capacity); + + if (!buffer || new_capacity < buffer->size) { + ret = -1; + goto end; + } + + if (rounded_capacity == buffer->capacity) { + goto end; + } + + if (!buffer->data) { + buffer->data = zmalloc(rounded_capacity); + if (!buffer->data) { + ret = -1; + goto end; + } + } else { + void *new_buf; + + new_buf = realloc(buffer->data, rounded_capacity); + if (new_buf) { + if (rounded_capacity > buffer->capacity) { + memset(new_buf + buffer->capacity, 0, + rounded_capacity - buffer->capacity); + } + } else { + /* Realloc failed, try to acquire a new block. */ + new_buf = zmalloc(rounded_capacity); + if (!new_buf) { + ret = -1; + goto end; + } + memcpy(new_buf, buffer->data, buffer->size); + free(buffer->data); + } + buffer->data = new_buf; + } + buffer->capacity = rounded_capacity; +end: + return ret; +} + +/* Release any memory used by the dynamic buffer. */ +void lttng_dynamic_buffer_reset(struct lttng_dynamic_buffer *buffer) +{ + if (!buffer) { + return; + } + buffer->size = 0; + buffer->capacity = 0; + free(buffer->data); +} diff --git a/src/common/dynamic-buffer.h b/src/common/dynamic-buffer.h new file mode 100644 index 000000000..82efd59a4 --- /dev/null +++ b/src/common/dynamic-buffer.h @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_DYNAMIC_BUFFER_H +#define LTTNG_DYNAMIC_BUFFER_H + +#include +#include + +struct lttng_dynamic_buffer { + char *data; + size_t size; + size_t capacity; +}; + +void lttng_dynamic_buffer_init(struct lttng_dynamic_buffer *buffer); + +int lttng_dynamic_buffer_append(struct lttng_dynamic_buffer *buffer, + const void *buf, size_t len); + +/* + * Set the buffer's size to new_size. The capacity of the buffer will + * be expanded (if necessary) to accomodate new_size. Areas acquired by + * an enlarging new_size _will be zeroed_. + * + * Be careful to expand the buffer's size _before_ calling out external + * APIs (e.g. read(3)) which may populate the buffer as setting the size + * _after_ will zero-out the result of the operation. + */ +int lttng_dynamic_buffer_set_size(struct lttng_dynamic_buffer *buffer, + size_t new_size); + +/* + * Set the buffer's capacity to accomodate the new_capacity, allocating memory + * as necessary. The buffer's content is preserved. + * + * If the current size > new_capacity, the operation will fail. + */ +int lttng_dynamic_buffer_set_capacity(struct lttng_dynamic_buffer *buffer, + size_t new_capacity); + +/* Release any memory used by the dynamic buffer. */ +void lttng_dynamic_buffer_reset(struct lttng_dynamic_buffer *buffer); + +#endif /* LTTNG_DYNAMIC_BUFFER_H */ diff --git a/src/common/endpoint.c b/src/common/endpoint.c new file mode 100644 index 000000000..89066de55 --- /dev/null +++ b/src/common/endpoint.c @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include + +static +struct lttng_endpoint lttng_session_daemon_notification_endpoint_instance = { + .type = LTTNG_ENDPOINT_TYPE_DEFAULT_SESSIOND_NOTIFICATION +}; + +struct lttng_endpoint *lttng_session_daemon_notification_endpoint = + <tng_session_daemon_notification_endpoint_instance; diff --git a/src/common/error.c b/src/common/error.c index 938932cdc..2215886db 100644 --- a/src/common/error.c +++ b/src/common/error.c @@ -186,6 +186,10 @@ static const char *error_string_array[] = { [ ERROR_INDEX(LTTNG_ERR_REGEN_STATEDUMP_FAIL) ] = "Failed to regenerate the state dump", [ ERROR_INDEX(LTTNG_ERR_REGEN_STATEDUMP_NOMEM) ] = "Failed to regenerate the state dump, not enough memory", [ ERROR_INDEX(LTTNG_ERR_NOT_SNAPSHOT_SESSION) ] = "Snapshot command can't be applied to a non-snapshot session", + [ ERROR_INDEX(LTTNG_ERR_INVALID_TRIGGER) ] = "Invalid trigger", + [ ERROR_INDEX(LTTNG_ERR_TRIGGER_EXISTS) ] = "Trigger already registered", + [ ERROR_INDEX(LTTNG_ERR_TRIGGER_NOT_FOUND) ] = "Trigger not found", + [ ERROR_INDEX(LTTNG_ERR_COMMAND_CANCELLED) ] = "Command cancelled", /* Last element */ [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code" diff --git a/src/common/evaluation.c b/src/common/evaluation.c new file mode 100644 index 000000000..49cba5dd1 --- /dev/null +++ b/src/common/evaluation.c @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include + +LTTNG_HIDDEN +ssize_t lttng_evaluation_serialize(struct lttng_evaluation *evaluation, + char *buf) +{ + ssize_t ret, offset = 0; + struct lttng_evaluation_comm evaluation_comm; + + evaluation_comm.type = (int8_t) evaluation->type; + if (buf) { + memcpy(buf, &evaluation_comm, sizeof(evaluation_comm)); + } + offset += sizeof(evaluation_comm); + + if (evaluation->serialize) { + ret = evaluation->serialize(evaluation, + buf ? (buf + offset) : NULL); + if (ret < 0) { + goto end; + } + offset += ret; + } + + ret = offset; +end: + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_evaluation_create_from_buffer(const char *buf, + struct lttng_evaluation **evaluation) +{ + ssize_t ret, evaluation_size = 0; + struct lttng_evaluation_comm *evaluation_comm = + (struct lttng_evaluation_comm *) buf; + + if (!buf || !evaluation) { + ret = -1; + goto end; + } + + evaluation_size += sizeof(*evaluation_comm); + buf += evaluation_size; + + switch ((enum lttng_condition_type) evaluation_comm->type) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + ret = lttng_evaluation_buffer_usage_low_create_from_buffer(buf, + evaluation); + if (ret < 0) { + goto end; + } + evaluation_size += ret; + break; + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + ret = lttng_evaluation_buffer_usage_high_create_from_buffer(buf, + evaluation); + if (ret < 0) { + goto end; + } + evaluation_size += ret; + break; + default: + ERR("Attempted to create evaluation of unknown type (%i)", + (int) evaluation_comm->type); + ret = -1; + goto end; + } + ret = evaluation_size; +end: + return ret; +} + +enum lttng_condition_type lttng_evaluation_get_type( + struct lttng_evaluation *evaluation) +{ + return evaluation ? evaluation->type : LTTNG_CONDITION_TYPE_UNKNOWN; +} + +void lttng_evaluation_destroy(struct lttng_evaluation *evaluation) +{ + if (!evaluation) { + return; + } + + assert(evaluation->destroy); + evaluation->destroy(evaluation); +} diff --git a/src/common/macros.h b/src/common/macros.h index 90849ed30..7eaf27cdf 100644 --- a/src/common/macros.h +++ b/src/common/macros.h @@ -20,6 +20,7 @@ #define _MACROS_H #include +#include #include #include @@ -58,6 +59,14 @@ void *zmalloc(size_t len) #define ARRAY_SIZE(array) (sizeof(array) / (sizeof((array)[0]))) #endif +#ifndef container_of +#define container_of(ptr, type, member) \ + ({ \ + const typeof(((type *)NULL)->member) * __ptr = (ptr); \ + (type *)((char *)__ptr - offsetof(type, member)); \ + }) +#endif + #ifndef max #define max(a, b) ((a) > (b) ? (a) : (b)) #endif diff --git a/src/common/notification.c b/src/common/notification.c new file mode 100644 index 000000000..9970974f9 --- /dev/null +++ b/src/common/notification.c @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include + +LTTNG_HIDDEN +struct lttng_notification *lttng_notification_create( + struct lttng_condition *condition, + struct lttng_evaluation *evaluation) +{ + struct lttng_notification *notification = NULL; + + if (!condition || !evaluation) { + goto end; + } + + notification = zmalloc(sizeof(struct lttng_notification)); + if (!notification) { + goto end; + } + + notification->condition = condition; + notification->evaluation = evaluation; + notification->owns_elements = false; +end: + return notification; +} + +LTTNG_HIDDEN +ssize_t lttng_notification_serialize(struct lttng_notification *notification, + char *buf) +{ + ssize_t ret, condition_size, evaluation_size, offset = 0; + struct lttng_notification_comm notification_comm; + + if (!notification) { + ret = -1; + goto end; + } + + offset += sizeof(notification_comm); + condition_size = lttng_condition_serialize(notification->condition, + buf ? (buf + offset) : NULL); + if (condition_size < 0) { + ret = condition_size; + goto end; + } + offset += condition_size; + + evaluation_size = lttng_evaluation_serialize(notification->evaluation, + buf ? (buf + offset) : NULL); + if (evaluation_size < 0) { + ret = evaluation_size; + goto end; + } + offset += evaluation_size; + + if (buf) { + notification_comm.length = + (uint32_t) (condition_size + evaluation_size); + memcpy(buf, ¬ification_comm, sizeof(notification_comm)); + } + ret = offset; +end: + return ret; + +} + +LTTNG_HIDDEN +ssize_t lttng_notification_create_from_buffer(const char *buf, + struct lttng_notification **notification) +{ + ssize_t ret, offset = 0, condition_size, evaluation_size; + struct lttng_notification_comm *notification_comm; + struct lttng_condition *condition; + struct lttng_evaluation *evaluation; + + if (!buf || !notification) { + ret = -1; + goto end; + } + + notification_comm = (struct lttng_notification_comm *) buf; + offset += sizeof(*notification_comm); + + /* struct lttng_condition */ + condition_size = lttng_condition_create_from_buffer(buf + offset, + &condition); + if (condition_size < 0) { + ret = condition_size; + goto end; + } + offset += condition_size; + + /* struct lttng_evaluation */ + evaluation_size = lttng_evaluation_create_from_buffer(buf + offset, + &evaluation); + if (evaluation_size < 0) { + ret = evaluation_size; + goto end; + } + offset += evaluation_size; + + /* Unexpected size of inner-elements; the buffer is corrupted. */ + if ((ssize_t) notification_comm->length != + condition_size + evaluation_size) { + ret = -1; + goto error; + } + + *notification = lttng_notification_create(condition, evaluation); + if (!*notification) { + ret = -1; + goto error; + } + ret = offset; + (*notification)->owns_elements = true; +end: + return ret; +error: + lttng_condition_destroy(condition); + lttng_evaluation_destroy(evaluation); + return ret; +} + +void lttng_notification_destroy(struct lttng_notification *notification) +{ + if (!notification) { + return; + } + + if (notification->owns_elements) { + lttng_condition_destroy(notification->condition); + lttng_evaluation_destroy(notification->evaluation); + } + free(notification); +} + +struct lttng_condition *lttng_notification_get_condition( + struct lttng_notification *notification) +{ + return notification ? notification->condition : NULL; +} + +struct lttng_evaluation *lttng_notification_get_evaluation( + struct lttng_notification *notification) +{ + return notification ? notification->evaluation : NULL; +} diff --git a/src/common/notify.c b/src/common/notify.c new file mode 100644 index 000000000..956c0ecff --- /dev/null +++ b/src/common/notify.c @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include + +static +void lttng_action_notify_destroy(struct lttng_action *action) +{ + free(action); +} + +static +ssize_t lttng_action_notify_serialize(struct lttng_action *action, char *buf) +{ + return 0; +} + +struct lttng_action *lttng_action_notify_create(void) +{ + struct lttng_action_notify *notify; + + notify = zmalloc(sizeof(struct lttng_action_notify)); + if (!notify) { + goto end; + } + + notify->parent.type = LTTNG_ACTION_TYPE_NOTIFY; + notify->parent.serialize = lttng_action_notify_serialize; + notify->parent.destroy = lttng_action_notify_destroy; +end: + return ¬ify->parent; +} diff --git a/src/common/pipe.c b/src/common/pipe.c index 00211238e..ffdecc0d5 100644 --- a/src/common/pipe.c +++ b/src/common/pipe.c @@ -311,3 +311,69 @@ error: unlock_write_side(pipe); return ret; } + +/* + * Return and release the read end of the pipe. + * + * This call transfers the ownership of the read fd of the underlying pipe + * to the caller if it is still open. + * + * Returns the fd of the read end of the pipe, or -1 if it was already closed or + * released. + */ +LTTNG_HIDDEN +int lttng_pipe_release_readfd(struct lttng_pipe *pipe) +{ + int ret; + + if (!pipe) { + ret = -1; + goto end; + } + + lock_read_side(pipe); + if (!lttng_pipe_is_read_open(pipe)) { + ret = -1; + goto end_unlock; + } + ret = pipe->fd[0]; + pipe->fd[0] = -1; + pipe->r_state = LTTNG_PIPE_STATE_CLOSED; +end_unlock: + unlock_read_side(pipe); +end: + return ret; +} + +/* + * Return and release the write end of the pipe. + * + * This call transfers the ownership of the write fd of the underlying pipe + * to the caller if it is still open. + * + * Returns the fd of the write end of the pipe, or -1 if it was alwritey closed + * or released. + */ +LTTNG_HIDDEN +int lttng_pipe_release_writefd(struct lttng_pipe *pipe) +{ + int ret; + + if (!pipe) { + ret = -1; + goto end; + } + + lock_write_side(pipe); + if (!lttng_pipe_is_write_open(pipe)) { + ret = -1; + goto end_unlock; + } + ret = pipe->fd[1]; + pipe->fd[1] = -1; + pipe->w_state = LTTNG_PIPE_STATE_CLOSED; +end_unlock: + unlock_write_side(pipe); +end: + return ret; +} diff --git a/src/common/pipe.h b/src/common/pipe.h index 0bc2db325..b9e0904a2 100644 --- a/src/common/pipe.h +++ b/src/common/pipe.h @@ -90,5 +90,11 @@ ssize_t lttng_pipe_read(struct lttng_pipe *pipe, void *buf, size_t count); LTTNG_HIDDEN ssize_t lttng_pipe_write(struct lttng_pipe *pipe, const void *buf, size_t count); +/* Returns and releases the read end of the pipe. */ +LTTNG_HIDDEN +int lttng_pipe_release_readfd(struct lttng_pipe *pipe); +/* Returns and releases the write end of the pipe. */ +LTTNG_HIDDEN +int lttng_pipe_release_writefd(struct lttng_pipe *pipe); #endif /* LTTNG_PIPE_H */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 0c2667031..3e85b9519 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -96,6 +97,8 @@ enum lttcomm_sessiond_command { LTTNG_SET_SESSION_SHM_PATH = 40, LTTNG_REGENERATE_METADATA = 41, LTTNG_REGENERATE_STATEDUMP = 42, + LTTNG_REGISTER_TRIGGER = 43, + LTTNG_UNREGISTER_TRIGGER = 44, }; enum lttcomm_relayd_command { @@ -146,6 +149,7 @@ enum lttcomm_return_code { LTTCOMM_CONSUMERD_RELAYD_FAIL, /* Error on remote relayd */ LTTCOMM_CONSUMERD_CHANNEL_FAIL, /* Channel creation failed. */ LTTCOMM_CONSUMERD_CHAN_NOT_FOUND, /* Channel not found. */ + LTTCOMM_CONSUMERD_ALREADY_SET, /* Resource already set. */ /* MUST be last element */ LTTCOMM_NR, /* Last element */ @@ -311,6 +315,9 @@ struct lttcomm_session_msg { struct { uint32_t pid; } LTTNG_PACKED pid_tracker; + struct { + uint32_t length; + } LTTNG_PACKED trigger; } u; } LTTNG_PACKED; @@ -454,6 +461,7 @@ struct lttcomm_consumer_msg { uint32_t switch_timer_interval; /* usec */ uint32_t read_timer_interval; /* usec */ unsigned int live_timer_interval; /* usec */ + uint32_t monitor_timer_interval; /* usec */ int32_t output; /* splice, mmap */ int32_t type; /* metadata or per_cpu */ uint64_t session_id; /* Tracing session id */ @@ -531,6 +539,19 @@ struct lttcomm_consumer_msg { } u; } LTTNG_PACKED; +/* + * Channel monitoring message returned to the session daemon on every + * monitor timer expiration. + */ +struct lttcomm_consumer_channel_monitor_msg { + /* Key of the sampled channel. */ + uint64_t key; + /* + * Lowest and highest usage (bytes) at the moment the sample was taken. + */ + uint64_t lowest, highest; +} LTTNG_PACKED; + /* * Status message returned to the sessiond after a received command. */ diff --git a/src/common/trigger.c b/src/common/trigger.c new file mode 100644 index 000000000..59c33669c --- /dev/null +++ b/src/common/trigger.c @@ -0,0 +1,179 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include + +LTTNG_HIDDEN +bool lttng_trigger_validate(struct lttng_trigger *trigger) +{ + bool valid; + + if (!trigger) { + valid = false; + goto end; + } + + valid = lttng_condition_validate(trigger->condition) && + lttng_action_validate(trigger->action); +end: + return valid; +} + +struct lttng_trigger *lttng_trigger_create( + struct lttng_condition *condition, + struct lttng_action *action) +{ + struct lttng_trigger *trigger = NULL; + + if (!condition || !action) { + goto end; + } + + trigger = zmalloc(sizeof(struct lttng_trigger)); + if (!trigger) { + goto end; + } + + trigger->condition = condition; + trigger->action = action; +end: + return trigger; +} + +struct lttng_condition *lttng_trigger_get_condition( + struct lttng_trigger *trigger) +{ + return trigger ? trigger->condition : NULL; +} + +extern struct lttng_action *lttng_trigger_get_action( + struct lttng_trigger *trigger) +{ + return trigger ? trigger->action : NULL; +} + +void lttng_trigger_destroy(struct lttng_trigger *trigger) +{ + if (!trigger) { + return; + } + + lttng_condition_destroy(trigger->condition); + lttng_action_destroy(trigger->action); + free(trigger); +} + +LTTNG_HIDDEN +ssize_t lttng_trigger_create_from_buffer(const char *buf, + struct lttng_trigger **trigger) +{ + ssize_t ret, offset = 0, condition_size, action_size; + struct lttng_condition *condition = NULL; + struct lttng_action *action = NULL; + struct lttng_trigger_comm *trigger_comm; + + if (!buf || !trigger) { + ret = -1; + goto end; + } + + /* lttng_trigger_comm header */ + trigger_comm = (struct lttng_trigger_comm *) buf; + offset += sizeof(*trigger_comm); + + /* struct lttng_condition */ + condition_size = lttng_condition_create_from_buffer(buf + offset, + &condition); + if (condition_size < 0) { + ret = condition_size; + goto end; + } + offset += condition_size; + + /* struct lttng_action */ + action_size = lttng_action_create_from_buffer(buf + offset, &action); + if (action_size < 0) { + ret = action_size; + goto end; + } + offset += action_size; + + /* Unexpected size of inner-elements; the buffer is corrupted. */ + if ((ssize_t) trigger_comm->length != condition_size + action_size) { + ret = -1; + goto error; + } + + *trigger = lttng_trigger_create(condition, action); + if (!*trigger) { + ret = -1; + goto error; + } + ret = offset; +end: + return ret; +error: + lttng_condition_destroy(condition); + lttng_action_destroy(action); + return ret; +} + +/* + * Returns the size of a trigger (header + condition + action). + * Both elements are stored contiguously, see their "*_comm" structure + * for the detailed format. + */ +LTTNG_HIDDEN +ssize_t lttng_trigger_serialize(struct lttng_trigger *trigger, char *buf) +{ + struct lttng_trigger_comm trigger_comm; + ssize_t action_size, condition_size, offset = 0, ret; + + if (!trigger) { + ret = -1; + goto end; + } + + offset += sizeof(trigger_comm); + condition_size = lttng_condition_serialize(trigger->condition, + buf ? (buf + offset) : NULL); + if (condition_size < 0) { + ret = -1; + goto end; + } + offset += condition_size; + + action_size = lttng_action_serialize(trigger->action, + buf ? (buf + offset) : NULL); + if (action_size < 0) { + ret = -1; + goto end; + } + offset += action_size; + + if (buf) { + trigger_comm.length = (uint32_t) (condition_size + action_size); + memcpy(buf, &trigger_comm, sizeof(trigger_comm)); + } + ret = offset; +end: + return ret; +} diff --git a/src/common/unix.c b/src/common/unix.c index 11c30781b..1d4ee9b36 100644 --- a/src/common/unix.c +++ b/src/common/unix.c @@ -226,7 +226,7 @@ ssize_t lttcomm_send_unix_sock(int sock, const void *buf, size_t len) { struct msghdr msg; struct iovec iov[1]; - ssize_t ret = -1; + ssize_t ret; memset(&msg, 0, sizeof(msg)); @@ -235,17 +235,28 @@ ssize_t lttcomm_send_unix_sock(int sock, const void *buf, size_t len) msg.msg_iov = iov; msg.msg_iovlen = 1; - ret = sendmsg(sock, &msg, 0); - if (ret < 0) { - /* - * Only warn about EPIPE when quiet mode is deactivated. - * We consider EPIPE as expected. - */ - if (errno != EPIPE || !lttng_opt_quiet) { - PERROR("sendmsg"); + while (iov[0].iov_len) { + ret = sendmsg(sock, &msg, 0); + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + /* + * Only warn about EPIPE when quiet mode is + * deactivated. + * We consider EPIPE as expected. + */ + if (errno != EPIPE || !lttng_opt_quiet) { + PERROR("sendmsg"); + } + goto end; + } } + iov[0].iov_len -= ret; + iov[0].iov_base += ret; } - + ret = len; +end: return ret; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 97f0497cb..af9ed5acd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1501,8 +1501,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_timer_switch_start(channel, attr.switch_timer_interval); attr.switch_timer_interval = 0; } else { + int monitor_start_ret; + consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval); + monitor_start_ret = consumer_timer_monitor_start( + channel, + msg.u.ask_channel.monitor_timer_interval); + if (monitor_start_ret < 0) { + ERR("Starting channel monitoring timer failed"); + goto end_channel_error; + } } health_code_update(); @@ -1525,6 +1534,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); } + if (channel->monitor_timer_enabled == 1) { + consumer_timer_monitor_stop(channel); + } goto end_channel_error; } @@ -1857,6 +1869,51 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } + case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: + { + int channel_monitor_pipe; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Successfully received the command's type. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + + ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, + 1); + if (ret != sizeof(channel_monitor_pipe)) { + ERR("Failed to receive channel monitor pipe"); + goto error_fatal; + } + + DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); + ret = consumer_timer_thread_set_channel_monitor_pipe( + channel_monitor_pipe); + if (!ret) { + int flags; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Set the pipe as non-blocking. */ + ret = fcntl(channel_monitor_pipe, F_GETFL, 0); + if (ret == -1) { + PERROR("fcntl get flags of the channel monitoring pipe"); + goto error_fatal; + } + flags = ret; + + ret = fcntl(channel_monitor_pipe, F_SETFL, + flags | O_NONBLOCK); + if (ret == -1) { + PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); + goto error_fatal; + } + DBG("Channel monitor pipe set as non-blocking"); + } else { + ret_code = LTTCOMM_CONSUMERD_ALREADY_SET; + } + goto end_msg_sessiond; + } default: break; } diff --git a/src/common/utils.c b/src/common/utils.c index 16d2f817a..4d49728c0 100644 --- a/src/common/utils.c +++ b/src/common/utils.c @@ -1017,6 +1017,59 @@ static inline unsigned int fls_u32(uint32_t x) #define HAS_FLS_U32 #endif +#if defined(__x86_64) +static inline +unsigned int fls_u64(uint64_t x) +{ + long r; + + asm("bsrq %1,%0\n\t" + "jnz 1f\n\t" + "movq $-1,%0\n\t" + "1:\n\t" + : "=r" (r) : "rm" (x)); + return r + 1; +} +#define HAS_FLS_U64 +#endif + +#ifndef HAS_FLS_U64 +static __attribute__((unused)) +unsigned int fls_u64(uint64_t x) +{ + unsigned int r = 64; + + if (!x) + return 0; + + if (!(x & 0xFFFFFFFF00000000ULL)) { + x <<= 32; + r -= 32; + } + if (!(x & 0xFFFF000000000000ULL)) { + x <<= 16; + r -= 16; + } + if (!(x & 0xFF00000000000000ULL)) { + x <<= 8; + r -= 8; + } + if (!(x & 0xF000000000000000ULL)) { + x <<= 4; + r -= 4; + } + if (!(x & 0xC000000000000000ULL)) { + x <<= 2; + r -= 2; + } + if (!(x & 0x8000000000000000ULL)) { + x <<= 1; + r -= 1; + } + return r; +} +#endif + #ifndef HAS_FLS_U32 static __attribute__((unused)) unsigned int fls_u32(uint32_t x) { @@ -1063,6 +1116,20 @@ int utils_get_count_order_u32(uint32_t x) return fls_u32(x - 1); } +/* + * Return the minimum order for which x <= (1UL << order). + * Return -1 if x is 0. + */ +LTTNG_HIDDEN +int utils_get_count_order_u64(uint64_t x) +{ + if (!x) { + return -1; + } + + return fls_u64(x - 1); +} + /** * Obtain the value of LTTNG_HOME environment variable, if exists. * Otherwise returns the value of HOME. diff --git a/src/common/utils.h b/src/common/utils.h index 7285f5c30..0daf5b98d 100644 --- a/src/common/utils.h +++ b/src/common/utils.h @@ -48,6 +48,7 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size, int *stream_fd); int utils_parse_size_suffix(char const * const str, uint64_t * const size); int utils_get_count_order_u32(uint32_t x); +int utils_get_count_order_u64(uint64_t x); char *utils_get_home_dir(void); char *utils_get_user_home_dir(uid_t uid); char *utils_get_kmod_probes_list(void); diff --git a/src/lib/lttng-ctl/Makefile.am b/src/lib/lttng-ctl/Makefile.am index 6b6a6eed1..b5156caf9 100644 --- a/src/lib/lttng-ctl/Makefile.am +++ b/src/lib/lttng-ctl/Makefile.am @@ -5,7 +5,8 @@ SUBDIRS = filter lib_LTLIBRARIES = liblttng-ctl.la liblttng_ctl_la_SOURCES = lttng-ctl.c snapshot.c lttng-ctl-helper.h \ - lttng-ctl-health.c save.c load.c deprecated-symbols.c + lttng-ctl-health.c save.c load.c deprecated-symbols.c \ + channel.c liblttng_ctl_la_LDFLAGS = \ $(LT_NO_UNDEFINED) diff --git a/src/lib/lttng-ctl/channel.c b/src/lib/lttng-ctl/channel.c new file mode 100644 index 000000000..c42829951 --- /dev/null +++ b/src/lib/lttng-ctl/channel.c @@ -0,0 +1,285 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "lttng-ctl-helper.h" + +struct lttng_notification_channel *lttng_notification_channel_create( + struct lttng_endpoint *endpoint) +{ + int fd, ret; + bool is_in_tracing_group = false, is_root = false; + char *sock_path = NULL; + struct lttng_notification_channel *channel = NULL; + + if (!endpoint || + endpoint != lttng_session_daemon_notification_endpoint) { + goto end; + } + + sock_path = zmalloc(LTTNG_PATH_MAX); + if (!sock_path) { + goto end; + } + + channel = zmalloc(sizeof(struct lttng_notification_channel)); + if (!channel) { + goto end; + } + channel->socket = -1; + + is_root = (getuid() == 0); + if (!is_root) { + is_in_tracing_group = lttng_check_tracing_group(); + } + + if (is_root || is_in_tracing_group) { + lttng_ctl_copy_string(sock_path, + DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK, + LTTNG_PATH_MAX); + ret = lttcomm_connect_unix_sock(sock_path); + if (ret >= 0) { + fd = ret; + goto set_fd; + } + } + + /* Fallback to local session daemon. */ + ret = snprintf(sock_path, LTTNG_PATH_MAX, + DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK, + utils_get_home_dir()); + if (ret < 0 || ret >= LTTNG_PATH_MAX) { + goto error; + } + + ret = lttcomm_connect_unix_sock(sock_path); + if (ret < 0) { + goto error; + } + fd = ret; + +set_fd: + channel->socket = fd; + + /* FIXME send creds */ +end: + free(sock_path); + return channel; +error: + lttng_notification_channel_destroy(channel); + channel = NULL; + goto end; +} + +enum lttng_notification_channel_status +lttng_notification_channel_get_next_notification( + struct lttng_notification_channel *channel, + struct lttng_notification **_notification) +{ + ssize_t ret; + struct lttng_notification_comm comm; + struct lttng_notification *notification = NULL; + struct lttng_dynamic_buffer reception_buffer; + struct lttng_notification_channel_message msg; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + + lttng_dynamic_buffer_init(&reception_buffer); + + if (!channel || !_notification) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + + ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg)); + if (ret <= 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + if (msg.type != LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + ret = lttcomm_recv_unix_sock(channel->socket, &comm, sizeof(comm)); + if (ret < sizeof(comm)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + ret = lttng_dynamic_buffer_set_size(&reception_buffer, + comm.length + sizeof(comm)); + if (ret) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + memcpy(reception_buffer.data, &comm, sizeof(comm)); + ret = lttcomm_recv_unix_sock(channel->socket, + reception_buffer.data + sizeof(comm), + comm.length); + if (ret < (ssize_t) comm.length) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + ret = lttng_notification_create_from_buffer(reception_buffer.data, + ¬ification); + if (ret != (sizeof(comm) + (ssize_t) comm.length)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto error; + } + *_notification = notification; +end: + lttng_dynamic_buffer_reset(&reception_buffer); + return status; +error: + lttng_notification_destroy(notification); + goto end; +} + +static +enum lttng_notification_channel_status send_command( + struct lttng_notification_channel *channel, + enum lttng_notification_channel_message_type type, + struct lttng_condition *condition) +{ + int socket; + ssize_t command_size, ret; + size_t received = 0; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + char *command_buffer = NULL; + struct lttng_notification_channel_message cmd_message = { + .type = type, + }; + struct lttng_notification_channel_message reply_message; + struct lttng_notification_channel_command_reply reply; + + if (!channel) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + + socket = channel->socket; + if (!lttng_condition_validate(condition)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + + ret = lttng_condition_serialize(condition, NULL); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + assert(ret < UINT32_MAX); + cmd_message.size = (uint32_t) ret; + command_size = ret + sizeof( + struct lttng_notification_channel_message); + command_buffer = zmalloc(command_size); + if (!command_buffer) { + goto end; + } + + memcpy(command_buffer, &cmd_message, sizeof(cmd_message)); + ret = lttng_condition_serialize(condition, + command_buffer + sizeof(cmd_message)); + if (ret < 0) { + goto end; + } + + ret = lttcomm_send_unix_sock(socket, command_buffer, command_size); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + /* Receive command reply header. */ + do { + ret = lttcomm_recv_unix_sock(socket, + ((char *) &reply_message) + received, + sizeof(reply_message) - received); + if (ret <= 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + received += ret; + } while (received < sizeof(reply_message)); + if (reply_message.type != + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY || + reply_message.size != sizeof(reply)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + /* Receive command reply payload. */ + received = 0; + do { + ret = lttcomm_recv_unix_sock(socket, + ((char *) &reply) + received, + sizeof(reply) - received); + if (ret <= 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + received += ret; + } while (received < sizeof(reply)); + status = (enum lttng_notification_channel_status) reply.status; +end: + free(command_buffer); + return status; +} + +enum lttng_notification_channel_status lttng_notification_channel_subscribe( + struct lttng_notification_channel *channel, + struct lttng_condition *condition) +{ + return send_command(channel, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE, + condition); +} + +enum lttng_notification_channel_status lttng_notification_channel_unsubscribe( + struct lttng_notification_channel *channel, + struct lttng_condition *condition) +{ + return send_command(channel, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE, + condition); +} + +void lttng_notification_channel_destroy( + struct lttng_notification_channel *channel) +{ + if (!channel) { + return; + } + + if (channel->socket >= 0) { + (void) lttcomm_close_unix_sock(channel->socket); + } + free(channel); +} + diff --git a/src/lib/lttng-ctl/lttng-ctl.c b/src/lib/lttng-ctl/lttng-ctl.c index f0b211c7b..e44986ee7 100644 --- a/src/lib/lttng-ctl/lttng-ctl.c +++ b/src/lib/lttng-ctl/lttng-ctl.c @@ -36,6 +36,8 @@ #include #include #include +#include +#include #include "filter/filter-ast.h" #include "filter/filter-parser.h" @@ -2435,6 +2437,94 @@ end: return ret; } +int lttng_register_trigger(struct lttng_trigger *trigger) +{ + int ret; + struct lttcomm_session_msg lsm; + char *trigger_buf = NULL; + ssize_t trigger_size; + + if (!trigger) { + ret = -LTTNG_ERR_INVALID; + goto end; + } + + if (!lttng_trigger_validate(trigger)) { + ret = -LTTNG_ERR_INVALID; + goto end; + } + + trigger_size = lttng_trigger_serialize(trigger, NULL); + if (trigger_size < 0) { + ret = -LTTNG_ERR_UNK; + goto end; + } + + trigger_buf = zmalloc(trigger_size); + if (!trigger_buf) { + ret = -LTTNG_ERR_NOMEM; + goto end; + } + + memset(&lsm, 0, sizeof(lsm)); + lsm.cmd_type = LTTNG_REGISTER_TRIGGER; + if (lttng_trigger_serialize(trigger, trigger_buf) < 0) { + ret = -LTTNG_ERR_UNK; + goto end; + } + + lsm.u.trigger.length = (uint32_t) trigger_size; + ret = lttng_ctl_ask_sessiond_varlen_no_cmd_header(&lsm, trigger_buf, + trigger_size, NULL); +end: + free(trigger_buf); + return ret; +} + +int lttng_unregister_trigger(struct lttng_trigger *trigger) +{ + int ret; + struct lttcomm_session_msg lsm; + char *trigger_buf = NULL; + ssize_t trigger_size; + + if (!trigger) { + ret = -LTTNG_ERR_INVALID; + goto end; + } + + if (!lttng_trigger_validate(trigger)) { + ret = -LTTNG_ERR_INVALID; + goto end; + } + + trigger_size = lttng_trigger_serialize(trigger, NULL); + if (trigger_size < 0) { + ret = -LTTNG_ERR_UNK; + goto end; + } + + trigger_buf = zmalloc(trigger_size); + if (!trigger_buf) { + ret = -LTTNG_ERR_NOMEM; + goto end; + } + + memset(&lsm, 0, sizeof(lsm)); + lsm.cmd_type = LTTNG_UNREGISTER_TRIGGER; + if (lttng_trigger_serialize(trigger, trigger_buf) < 0) { + ret = -LTTNG_ERR_UNK; + goto end; + } + + lsm.u.trigger.length = (uint32_t) trigger_size; + ret = lttng_ctl_ask_sessiond_varlen_no_cmd_header(&lsm, trigger_buf, + trigger_size, NULL); +end: + free(trigger_buf); + return ret; +} + /* * lib constructor. */ diff --git a/tests/unit/Makefile.am b/tests/unit/Makefile.am index f9dd53bef..ddae048ca 100644 --- a/tests/unit/Makefile.am +++ b/tests/unit/Makefile.am @@ -73,6 +73,7 @@ UST_DATA_TRACE=$(top_builddir)/src/bin/lttng-sessiond/trace-ust.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/session.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/snapshot.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/agent.$(OBJEXT) \ + $(top_builddir)/src/bin/lttng-sessiond/notification-thread-commands.$(OBJEXT) \ $(top_builddir)/src/common/libcommon.la \ $(top_builddir)/src/common/health/libhealth.la \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la diff --git a/tests/unit/test_ust_data.c b/tests/unit/test_ust_data.c index 7996b8e60..86c1e23bf 100644 --- a/tests/unit/test_ust_data.c +++ b/tests/unit/test_ust_data.c @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -49,8 +50,9 @@ int lttng_opt_mi; int ust_consumerd32_fd; int ust_consumerd64_fd; -/* Global variable required by sessiond objects being linked-in */ +/* Global variables required by sessiond objects being linked-in */ struct lttng_ht *agent_apps_ht_by_sock; +struct notification_thread_handle *notification_thread_handle; static const char alphanum[] = "0123456789" -- 2.34.1