From 967e3668fc4d224563281a809b1483a397dd9534 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 18 Jan 2017 19:23:26 -0500 Subject: [PATCH 01/16] Fix: thread_dispatch_ust_registration needs to be a RCU thread MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit It uses a read-side lock. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- src/bin/lttng-sessiond/main.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 40670ddd5..172f8c270 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -1855,6 +1855,8 @@ static void *thread_dispatch_ust_registration(void *data) .count = 0, }; + rcu_register_thread(); + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH); if (testpoint(sessiond_thread_app_reg_dispatch)) { @@ -2088,6 +2090,7 @@ error_testpoint: ERR("Health error occurred in %s", __func__); } health_unregister(health_sessiond); + rcu_unregister_thread(); return NULL; } -- 2.34.1 From fe19a07a543415355ad6ab04e6fe72f167a48262 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 18 Jan 2017 19:23:27 -0500 Subject: [PATCH 02/16] Fix: consumerd main: needs to be a registered RCU thread MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit main->lttng_consumer_destroy->destroy_data_stream_ht requires a RCU read-side lock. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- src/bin/lttng-consumerd/lttng-consumerd.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index 7f78c4ec2..2f8eed108 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -308,6 +308,8 @@ int main(int argc, char **argv) void *status; struct lttng_consumer_local_data *tmp_ctx; + rcu_register_thread(); + if (set_signal_handler()) { retval = -1; goto exit_set_signal_handler; @@ -643,6 +645,8 @@ exit_health_consumerd_cleanup: exit_options: exit_set_signal_handler: + rcu_unregister_thread(); + if (!retval) { exit(EXIT_SUCCESS); } else { -- 2.34.1 From 6986ab9b81183b7759e78eb60adaaa8188b8899b Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 11 Jan 2017 15:49:48 -0500 Subject: [PATCH 03/16] Fix: sessiond: only send streams to consumer once MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Session daemon should not send streams to consumer daemon repeatedly when CPU hotplug is performed while doing kernel tracing. This causes the consumer daemon to have multiple file descriptors on the same stream, and thus try to perform operations like reading a sub-buffer and checking for data pending concurrently. This triggers safety-net warnings in the kernel tracer. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- src/bin/lttng-sessiond/kernel-consumer.c | 3 ++- src/bin/lttng-sessiond/trace-kernel.h | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 7582d80f0..2241acbca 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -338,7 +338,7 @@ int kernel_consumer_send_channel_stream(struct consumer_socket *sock, /* Send streams */ cds_list_for_each_entry(stream, &channel->stream_list.head, list) { - if (!stream->fd) { + if (!stream->fd || stream->sent_to_consumer) { continue; } @@ -348,6 +348,7 @@ int kernel_consumer_send_channel_stream(struct consumer_socket *sock, if (ret < 0) { goto error; } + stream->sent_to_consumer = true; } error: diff --git a/src/bin/lttng-sessiond/trace-kernel.h b/src/bin/lttng-sessiond/trace-kernel.h index b9bcbfa77..2092469ad 100644 --- a/src/bin/lttng-sessiond/trace-kernel.h +++ b/src/bin/lttng-sessiond/trace-kernel.h @@ -84,6 +84,7 @@ struct ltt_kernel_stream { int fd; int state; int cpu; + bool sent_to_consumer; /* Format is %s_%d respectively channel name and CPU number. */ char name[DEFAULT_STREAM_NAME_LEN]; uint64_t tracefile_size; -- 2.34.1 From 7b87473dc4e12b6ac5e10144eff786201d12d2f2 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 11 Jan 2017 15:49:49 -0500 Subject: [PATCH 04/16] Fix: consumerd: add missing put_subbuf for ust and kernel errors MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit While reading a sub-buffer, error handling need to put the sub-buffer, else all future attempts to use the stream will trigger warnings. The affects recent features added to UST and kernel tracing. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- src/common/kernel-consumer/kernel-consumer.c | 22 ++++++++++++++++++++ src/common/ust-consumer/ust-consumer.c | 4 ++++ 2 files changed, 26 insertions(+) diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index bb5ba3b13..d5418dcda 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1315,12 +1315,34 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } ret = update_stream_stats(stream); if (ret < 0) { + err = kernctl_put_subbuf(infd); + if (err != 0) { + if (err == -EFAULT) { + PERROR("Error in unreserving sub buffer\n"); + } else if (err == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + ret = err; + goto end; + } goto end; } } else { write_index = 0; ret = metadata_stream_check_version(infd, stream); if (ret < 0) { + err = kernctl_put_subbuf(infd); + if (err != 0) { + if (err == -EFAULT) { + PERROR("Error in unreserving sub buffer\n"); + } else if (err == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + ret = err; + goto end; + } goto end; } } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 7bbfa60a2..97f0497cb 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2496,6 +2496,8 @@ retry: index.offset = htobe64(stream->out_fd_offset); ret = get_index_values(&index, ustream); if (ret < 0) { + err = ustctl_put_subbuf(ustream); + assert(err == 0); goto end; } @@ -2503,6 +2505,8 @@ retry: ret = update_stream_stats(stream); if (ret < 0) { PERROR("kernctl_get_events_discarded"); + err = ustctl_put_subbuf(ustream); + assert(err == 0); goto end; } } else { -- 2.34.1 From 1f3c3a24e3a07f6fd032ad50bac92f1f58d33a12 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Fri, 13 Jan 2017 17:04:42 -0500 Subject: [PATCH 05/16] Man: move [SESSION] before options MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The previous synopses for the live mode can cause confusion to users since it can lead to an error while trying one of the simplest create command for live session that the synopsis is proposing: lttng create --live test. Other synopsis are modified for symmetry. Fixes #1081 Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- doc/man/lttng-create.1.txt | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/doc/man/lttng-create.1.txt b/doc/man/lttng-create.1.txt index 8c61b0926..152e3f393 100644 --- a/doc/man/lttng-create.1.txt +++ b/doc/man/lttng-create.1.txt @@ -12,27 +12,25 @@ SYNOPSIS Local mode: [verse] -*lttng* ['linkgenoptions:(GENERAL OPTIONS)'] *create* [option:--shm-path='PATH'] - [option:--no-output | option:--output='PATH' | option:--set-url=file://'PATH'] ['SESSION'] +*lttng* ['linkgenoptions:(GENERAL OPTIONS)'] *create* ['SESSION'] [option:--shm-path='PATH'] + [option:--no-output | option:--output='PATH' | option:--set-url=file://'PATH'] Network streaming mode: [verse] -*lttng* ['linkgenoptions:(GENERAL OPTIONS)'] *create* [option:--shm-path='PATH'] - (option:--set-url='URL' | option:--ctrl-url='URL' option:--data-url='URL') ['SESSION'] - +*lttng* ['linkgenoptions:(GENERAL OPTIONS)'] *create* ['SESSION'] [option:--shm-path='PATH'] + (option:--set-url='URL' | option:--ctrl-url='URL' option:--data-url='URL') Snapshot mode: [verse] -*lttng* ['linkgenoptions:(GENERAL OPTIONS)'] *create* option:--snapshot [option:--shm-path='PATH'] - [option:--set-url='URL' | option:--ctrl-url='URL' option:--data-url='URL'] ['SESSION'] +*lttng* ['linkgenoptions:(GENERAL OPTIONS)'] *create* ['SESSION'] option:--snapshot + [option:--shm-path='PATH'] [option:--set-url='URL' | option:--ctrl-url='URL' option:--data-url='URL'] Live mode: [verse] -*lttng* ['linkgenoptions:(GENERAL OPTIONS)'] *create* option:--live[='DELAYUS'] [option:--shm-path='PATH'] - [option:--set-url='URL' | option:--ctrl-url='URL' option:--data-url='URL'] ['SESSION'] - +*lttng* ['linkgenoptions:(GENERAL OPTIONS)'] *create* ['SESSION'] option:--live[='DELAYUS'] + [option:--shm-path='PATH'] [option:--set-url='URL' | option:--ctrl-url='URL' option:--data-url='URL'] DESCRIPTION ----------- -- 2.34.1 From 84a7eb731975042c535645dd747a51825b302f93 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 17 Jan 2017 10:02:08 -0500 Subject: [PATCH 06/16] Fix: test_kernel_data dereference of null pointer MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Skip tests when tested struct is null. Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- tests/unit/test_kernel_data.c | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/unit/test_kernel_data.c b/tests/unit/test_kernel_data.c index 5bca684a5..6a35ea54a 100644 --- a/tests/unit/test_kernel_data.c +++ b/tests/unit/test_kernel_data.c @@ -72,6 +72,10 @@ static void test_create_one_kernel_session(void) kern = trace_kernel_create_session(); ok(kern != NULL, "Create kernel session"); + if (!kern) { + skip(1, "Kernel session is null"); + return; + } ok(kern->fd == -1 && kern->metadata_stream_fd == -1 && kern->consumer_fds_sent == 0 && @@ -117,6 +121,11 @@ static void test_create_kernel_channel(void) chan = trace_kernel_create_channel(&attr); ok(chan != NULL, "Create kernel channel"); + if (!chan) { + skip(1, "Channel is null"); + return; + } + ok(chan->fd == -1 && chan->enabled == 1 && chan->stream_count == 0 && @@ -143,6 +152,11 @@ static void test_create_kernel_event(void) event = trace_kernel_create_event(&ev, NULL, NULL); ok(event != NULL, "Create kernel event"); + if (!event) { + skip(1, "Event is null"); + return; + } + ok(event->fd == -1 && event->enabled == 1 && event->event->instrumentation == LTTNG_KERNEL_TRACEPOINT && @@ -161,6 +175,11 @@ static void test_create_kernel_stream(void) stream = trace_kernel_create_stream("stream1", 0); ok(stream != NULL, "Create kernel stream"); + if (!stream) { + skip(1, "Stream is null"); + return; + } + ok(stream->fd == -1 && stream->state == 0, "Validate kernel stream"); -- 2.34.1 From 157b2d445425611ab8ada02e34204cd9134c2acc Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 17 Jan 2017 10:07:31 -0500 Subject: [PATCH 07/16] x is never reused, no need to shift it MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Also remove noise in scanbuild report. Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- src/bin/lttng-sessiond/ust-metadata.c | 1 - src/common/compat/string.h | 1 - 2 files changed, 2 deletions(-) diff --git a/src/bin/lttng-sessiond/ust-metadata.c b/src/bin/lttng-sessiond/ust-metadata.c index c1a80c746..fb1f11521 100644 --- a/src/bin/lttng-sessiond/ust-metadata.c +++ b/src/bin/lttng-sessiond/ust-metadata.c @@ -74,7 +74,6 @@ int fls(unsigned int x) r -= 2; } if (!(x & 0x80000000U)) { - x <<= 1; r -= 1; } return r; diff --git a/src/common/compat/string.h b/src/common/compat/string.h index 7c426ef3d..db3db8052 100644 --- a/src/common/compat/string.h +++ b/src/common/compat/string.h @@ -117,7 +117,6 @@ static inline int lttng_fls(int val) r -= 2; } if (!(x & 0x80000000U)) { - x <<= 1; r -= 1; } return r; -- 2.34.1 From 67b2f51c7e8d8e873be92cb45a534855cecaf7de Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 17 Jan 2017 10:08:22 -0500 Subject: [PATCH 08/16] Fix: test_ust_data dereference of null pointer MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Skip test on NULL value to prevent null dereference. Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- tests/unit/test_ust_data.c | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_ust_data.c b/tests/unit/test_ust_data.c index 82d15b66d..cf795cfa0 100644 --- a/tests/unit/test_ust_data.c +++ b/tests/unit/test_ust_data.c @@ -84,6 +84,11 @@ static void test_create_one_ust_session(void) usess = trace_ust_create_session(42); ok(usess != NULL, "Create UST session"); + if (!usess) { + skip(1, "UST session is null"); + return; + } + ok(usess->id == 42 && usess->active == 0 && usess->domain_global.channels != NULL && @@ -106,6 +111,11 @@ static void test_create_ust_channel(void) uchan = trace_ust_create_channel(&attr, LTTNG_DOMAIN_UST); ok(uchan != NULL, "Create UST channel"); + if (!usess) { + skip(1, "UST session is null"); + return; + } + ok(uchan->enabled == 0 && strncmp(uchan->name, "channel0", 8) == 0 && uchan->name[LTTNG_UST_SYM_NAME_LEN - 1] == '\0' && @@ -133,6 +143,11 @@ static void test_create_ust_event(void) ok(event != NULL, "Create UST event"); + if (!event) { + skip(1, "UST event is null"); + return; + } + ok(event->enabled == 0 && event->attr.instrumentation == LTTNG_UST_TRACEPOINT && strcmp(event->attr.name, ev.name) == 0 && @@ -167,8 +182,8 @@ static void test_create_ust_event_exclusion(void) LTTNG_SYMBOL_NAME_LEN * exclusion_count); ok(exclusion != NULL, "Create UST exclusion"); if (!exclusion) { - PERROR("zmalloc"); - abort(); + skip(4, "zmalloc failed"); + goto end; } exclusion->count = exclusion_count; @@ -187,8 +202,8 @@ static void test_create_ust_event_exclusion(void) LTTNG_SYMBOL_NAME_LEN * exclusion_count); ok(exclusion != NULL, "Create UST exclusion"); if (!exclusion) { - PERROR("zmalloc"); - abort(); + skip(2, "zmalloc failed"); + goto end; } exclusion->count = exclusion_count; @@ -198,10 +213,13 @@ static void test_create_ust_event_exclusion(void) get_random_string(), LTTNG_SYMBOL_NAME_LEN); event = trace_ust_create_event(&ev, NULL, NULL, exclusion, false); - assert(event != NULL); - ok(event != NULL, "Create UST event with different exclusion names"); + if (!event) { + skip(1, "UST event with exclusion is null"); + goto end; + } + ok(event->enabled == 0 && event->attr.instrumentation == LTTNG_UST_TRACEPOINT && strcmp(event->attr.name, ev.name) == 0 && @@ -213,6 +231,8 @@ static void test_create_ust_event_exclusion(void) "Validate UST event and exclusion"); trace_ust_destroy_event(event); +end: + return; } -- 2.34.1 From 1e19c0f692117a7e1d2f00b2c434d7bd68485575 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 17 Jan 2017 10:08:47 -0500 Subject: [PATCH 09/16] Fix: null dereference on error path for create_ctx_type MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit When zmalloc of type->opt fail the destroy_ctx_type would result in a null dereference. Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- src/bin/lttng/commands/add_context.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/bin/lttng/commands/add_context.c b/src/bin/lttng/commands/add_context.c index 5fc65bf55..df722bb24 100644 --- a/src/bin/lttng/commands/add_context.c +++ b/src/bin/lttng/commands/add_context.c @@ -662,7 +662,9 @@ void destroy_ctx_type(struct ctx_type *type) if (!type) { return; } - free(type->opt->symbol); + if (type->opt) { + free(type->opt->symbol); + } free(type->opt); free(type); } -- 2.34.1 From c607fe035ea8821a8f4c6ab6f3467ec726e36a63 Mon Sep 17 00:00:00 2001 From: Michael Jeanson Date: Thu, 26 Jan 2017 14:36:45 -0500 Subject: [PATCH 10/16] Fix: Lazily initialize max poll set size in poll compat MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This was applied to the epoll implementation in commit 22dad56815ce0201c5ae7d5ef5d79cc0c6a42c5e Signed-off-by: Michael Jeanson Signed-off-by: Jérémie Galarneau --- src/common/compat/compat-poll.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/common/compat/compat-poll.c b/src/common/compat/compat-poll.c index cc280c764..7d4d0e133 100644 --- a/src/common/compat/compat-poll.c +++ b/src/common/compat/compat-poll.c @@ -111,8 +111,9 @@ int compat_poll_create(struct lttng_poll_event *events, int size) } if (!poll_max_size) { - ERR("poll_max_size not initialized yet"); - goto error; + if (lttng_poll_set_max_size()) { + goto error; + } } /* Don't bust the limit here */ -- 2.34.1 From 8273250bde9f8fa0cc2f6d8ea86aaf4b9fe83fc1 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Thu, 26 Jan 2017 14:53:03 -0500 Subject: [PATCH 11/16] Fix: tests: register thread for RCU operations. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- tests/unit/test_session.c | 4 ++++ tests/unit/test_ust_data.c | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/tests/unit/test_session.c b/tests/unit/test_session.c index f2343c991..03e6b9161 100644 --- a/tests/unit/test_session.c +++ b/tests/unit/test_session.c @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -302,6 +303,8 @@ int main(int argc, char **argv) diag("Sessions unit tests"); + rcu_register_thread(); + test_session_list(); test_create_one_session(); @@ -318,6 +321,7 @@ int main(int argc, char **argv) test_large_session_number(); + rcu_unregister_thread(); assert(!fini_ht_cleanup_thread(&ht_cleanup_thread)); return exit_status(); diff --git a/tests/unit/test_ust_data.c b/tests/unit/test_ust_data.c index cf795cfa0..7996b8e60 100644 --- a/tests/unit/test_ust_data.c +++ b/tests/unit/test_ust_data.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -257,11 +258,15 @@ int main(int argc, char **argv) diag("UST data structures unit test"); + rcu_register_thread(); + test_create_one_ust_session(); test_create_ust_channel(); test_create_ust_event(); test_create_ust_context(); test_create_ust_event_exclusion(); + rcu_unregister_thread(); + return exit_status(); } -- 2.34.1 From e55c055d9742a9af996214af306ea68c51e168d8 Mon Sep 17 00:00:00 2001 From: Michael Jeanson Date: Thu, 26 Jan 2017 14:55:46 -0500 Subject: [PATCH 12/16] Fix: Remove unused headers MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This is a portability fix, these headers are unused and not available on some platforms. Signed-off-by: Michael Jeanson Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/health-relayd.c | 1 - src/common/futex.c | 1 - 2 files changed, 2 deletions(-) diff --git a/src/bin/lttng-relayd/health-relayd.c b/src/bin/lttng-relayd/health-relayd.c index cff6c6e72..b14b82772 100644 --- a/src/bin/lttng-relayd/health-relayd.c +++ b/src/bin/lttng-relayd/health-relayd.c @@ -37,7 +37,6 @@ #include #include #include -#include #include #include diff --git a/src/common/futex.c b/src/common/futex.c index 2ae03b27d..384e957a2 100644 --- a/src/common/futex.c +++ b/src/common/futex.c @@ -18,7 +18,6 @@ #define _LGPL_SOURCE #include -#include #include #include #include -- 2.34.1 From 07bb6d71d12af83c72d9cc18a7488d6b15ceb440 Mon Sep 17 00:00:00 2001 From: Michael Jeanson Date: Thu, 26 Jan 2017 15:09:21 -0500 Subject: [PATCH 13/16] Port: add cygwin support to endian compat MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Michael Jeanson Signed-off-by: Jérémie Galarneau --- src/common/compat/endian.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/compat/endian.h b/src/common/compat/endian.h index 4edc12435..6427feb8d 100644 --- a/src/common/compat/endian.h +++ b/src/common/compat/endian.h @@ -18,7 +18,7 @@ #ifndef _COMPAT_ENDIAN_H #define _COMPAT_ENDIAN_H -#ifdef __linux__ +#if defined(__linux__) || defined(__CYGWIN__) #include /* -- 2.34.1 From 70c6956ba2dcd36f4e44158a0b27621eda336a6a Mon Sep 17 00:00:00 2001 From: Michael Jeanson Date: Thu, 26 Jan 2017 15:09:22 -0500 Subject: [PATCH 14/16] Port: win32 DLLs don't support hidden symbols MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Michael Jeanson Signed-off-by: Jérémie Galarneau --- src/common/macros.h | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/common/macros.h b/src/common/macros.h index 8ae6535d0..90849ed30 100644 --- a/src/common/macros.h +++ b/src/common/macros.h @@ -74,7 +74,14 @@ void *zmalloc(size_t len) #define LTTNG_PACKED __attribute__((__packed__)) #endif -#ifndef LTTNG_HIDDEN +/* + * LTTNG_HIDDEN: set the hidden attribute for internal functions + * On Windows, symbols are local unless explicitly exported, + * see https://gcc.gnu.org/wiki/Visibility + */ +#if defined(_WIN32) || defined(__CYGWIN__) +#define LTTNG_HIDDEN +#else #define LTTNG_HIDDEN __attribute__((visibility("hidden"))) #endif -- 2.34.1 From d30b2041e3a5db2d1671219ea845057cb3dd3082 Mon Sep 17 00:00:00 2001 From: Michael Jeanson Date: Thu, 2 Feb 2017 17:09:43 -0500 Subject: [PATCH 15/16] Port: Link with no-undefined on Windows MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Michael Jeanson Signed-off-by: Jérémie Galarneau --- configure.ac | 10 ++++++++++ src/bin/lttng-relayd/Makefile.am | 4 ++-- src/lib/lttng-ctl/Makefile.am | 3 +++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/configure.ac b/configure.ac index 6badbf9e6..0a7034e24 100644 --- a/configure.ac +++ b/configure.ac @@ -65,6 +65,16 @@ CC="$PTHREAD_CC" AX_LIB_SOCKET_NSL +LT_NO_UNDEFINED="" +AS_CASE([$host_os], + [cygwin*], + [ + LT_NO_UNDEFINED="-no-undefined" + ] +) + +AC_SUBST(LT_NO_UNDEFINED) + # Compute minor/major/patchlevel version numbers major_version=$(echo AC_PACKAGE_VERSION | $SED 's/^\([[0-9]]\)*\.[[0-9]]*\.[[0-9]]*.*$/\1/') minor_version=$(echo AC_PACKAGE_VERSION | $SED 's/^[[0-9]]*\.\([[0-9]]*\)\.[[0-9]]*.*$/\1/') diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index 4857074df..df87bc300 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -24,7 +24,6 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \ # link on liblttngctl for check if relayd is already alive. lttng_relayd_LDADD = -lurcu-common -lurcu \ - $(top_builddir)/src/lib/lttng-ctl/liblttng-ctl.la \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \ $(top_builddir)/src/common/hashtable/libhashtable.la \ $(top_builddir)/src/common/libcommon.la \ @@ -32,4 +31,5 @@ lttng_relayd_LDADD = -lurcu-common -lurcu \ $(top_builddir)/src/common/index/libindex.la \ $(top_builddir)/src/common/health/libhealth.la \ $(top_builddir)/src/common/config/libconfig.la \ - $(top_builddir)/src/common/testpoint/libtestpoint.la + $(top_builddir)/src/common/testpoint/libtestpoint.la \ + $(top_builddir)/src/lib/lttng-ctl/liblttng-ctl.la diff --git a/src/lib/lttng-ctl/Makefile.am b/src/lib/lttng-ctl/Makefile.am index 5b87e38b7..6b6a6eed1 100644 --- a/src/lib/lttng-ctl/Makefile.am +++ b/src/lib/lttng-ctl/Makefile.am @@ -7,6 +7,9 @@ lib_LTLIBRARIES = liblttng-ctl.la liblttng_ctl_la_SOURCES = lttng-ctl.c snapshot.c lttng-ctl-helper.h \ lttng-ctl-health.c save.c load.c deprecated-symbols.c +liblttng_ctl_la_LDFLAGS = \ + $(LT_NO_UNDEFINED) + liblttng_ctl_la_LIBADD = \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \ $(top_builddir)/src/common/libcommon.la \ -- 2.34.1 From daa81f582a5374fd38efc416be0c1feeed659dff Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 24 Feb 2017 01:13:17 -0500 Subject: [PATCH 16/16] Deliverables 3 and 4 MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Galarneau --- configure.ac | 14 +- include/Makefile.am | 28 +- include/lttng/action/action-internal.h | 55 + include/lttng/action/action.h | 41 + include/lttng/action/notify-internal.h | 28 + include/lttng/action/notify.h | 33 + .../lttng/condition/buffer-usage-internal.h | 102 + include/lttng/condition/buffer-usage.h | 102 + include/lttng/condition/condition-internal.h | 70 + include/lttng/condition/condition.h | 52 + include/lttng/condition/evaluation-internal.h | 53 + include/lttng/condition/evaluation.h | 46 + include/lttng/endpoint-internal.h | 32 + include/lttng/endpoint.h | 32 + include/lttng/lttng-error.h | 4 + include/lttng/notification/channel-internal.h | 51 + include/lttng/notification/channel.h | 67 + .../notification/notification-internal.h | 56 + include/lttng/notification/notification.h | 46 + include/lttng/trigger/trigger-internal.h | 52 + include/lttng/trigger/trigger.h | 54 + src/bin/lttng-consumerd/lttng-consumerd.c | 10 +- src/bin/lttng-sessiond/Makefile.am | 5 +- src/bin/lttng-sessiond/agent-thread.c | 2 +- src/bin/lttng-sessiond/cmd.c | 81 + src/bin/lttng-sessiond/cmd.h | 7 + src/bin/lttng-sessiond/consumer.c | 32 +- src/bin/lttng-sessiond/consumer.h | 8 + src/bin/lttng-sessiond/health-sessiond.h | 1 + src/bin/lttng-sessiond/lttng-sessiond.h | 5 +- src/bin/lttng-sessiond/lttng-ust-ctl.h | 1 + src/bin/lttng-sessiond/main.c | 224 ++- .../notification-thread-commands.c | 168 ++ .../notification-thread-commands.h | 89 + .../notification-thread-events.c | 1663 +++++++++++++++++ .../notification-thread-events.h | 52 + src/bin/lttng-sessiond/notification-thread.c | 682 +++++++ src/bin/lttng-sessiond/notification-thread.h | 73 + src/bin/lttng-sessiond/ust-app.c | 72 +- src/bin/lttng-sessiond/ust-consumer.c | 10 + src/bin/lttng-sessiond/ust-registry.c | 22 +- src/bin/lttng-sessiond/ust-registry.h | 3 +- src/common/Makefile.am | 5 +- src/common/action.c | 119 ++ src/common/buffer-usage.c | 784 ++++++++ src/common/condition.c | 159 ++ src/common/consumer/consumer-timer.c | 342 +++- src/common/consumer/consumer-timer.h | 11 +- src/common/consumer/consumer.c | 5 + src/common/consumer/consumer.h | 11 + src/common/defaults.h | 10 +- src/common/dynamic-buffer.c | 166 ++ src/common/dynamic-buffer.h | 59 + src/common/endpoint.c | 26 + src/common/error.c | 4 + src/common/evaluation.c | 110 ++ src/common/macros.h | 9 + src/common/notification.c | 168 ++ src/common/notify.c | 49 + src/common/pipe.c | 66 + src/common/pipe.h | 6 + src/common/sessiond-comm/sessiond-comm.h | 21 + src/common/trigger.c | 179 ++ src/common/unix.c | 31 +- src/common/ust-consumer/ust-consumer.c | 57 + src/common/utils.c | 67 + src/common/utils.h | 1 + src/lib/lttng-ctl/Makefile.am | 3 +- src/lib/lttng-ctl/channel.c | 285 +++ src/lib/lttng-ctl/lttng-ctl.c | 90 + tests/unit/Makefile.am | 1 + tests/unit/test_ust_data.c | 4 +- 72 files changed, 6955 insertions(+), 121 deletions(-) create mode 100644 include/lttng/action/action-internal.h create mode 100644 include/lttng/action/action.h create mode 100644 include/lttng/action/notify-internal.h create mode 100644 include/lttng/action/notify.h create mode 100644 include/lttng/condition/buffer-usage-internal.h create mode 100644 include/lttng/condition/buffer-usage.h create mode 100644 include/lttng/condition/condition-internal.h create mode 100644 include/lttng/condition/condition.h create mode 100644 include/lttng/condition/evaluation-internal.h create mode 100644 include/lttng/condition/evaluation.h create mode 100644 include/lttng/endpoint-internal.h create mode 100644 include/lttng/endpoint.h create mode 100644 include/lttng/notification/channel-internal.h create mode 100644 include/lttng/notification/channel.h create mode 100644 include/lttng/notification/notification-internal.h create mode 100644 include/lttng/notification/notification.h create mode 100644 include/lttng/trigger/trigger-internal.h create mode 100644 include/lttng/trigger/trigger.h create mode 100644 src/bin/lttng-sessiond/notification-thread-commands.c create mode 100644 src/bin/lttng-sessiond/notification-thread-commands.h create mode 100644 src/bin/lttng-sessiond/notification-thread-events.c create mode 100644 src/bin/lttng-sessiond/notification-thread-events.h create mode 100644 src/bin/lttng-sessiond/notification-thread.c create mode 100644 src/bin/lttng-sessiond/notification-thread.h create mode 100644 src/common/action.c create mode 100644 src/common/buffer-usage.c create mode 100644 src/common/condition.c create mode 100644 src/common/dynamic-buffer.c create mode 100644 src/common/dynamic-buffer.h create mode 100644 src/common/endpoint.c create mode 100644 src/common/evaluation.c create mode 100644 src/common/notification.c create mode 100644 src/common/notify.c create mode 100644 src/common/trigger.c create mode 100644 src/lib/lttng-ctl/channel.c diff --git a/configure.ac b/configure.ac index 0a7034e24..9bc394123 100644 --- a/configure.ac +++ b/configure.ac @@ -948,8 +948,20 @@ CFLAGS="-Wall $CFLAGS -g -fno-strict-aliasing" DEFAULT_INCLUDES="-I\$(top_srcdir) -I\$(top_builddir) -I\$(top_builddir)/src -I\$(top_builddir)/include -include config.h" lttngincludedir="${includedir}/lttng" - AC_SUBST(lttngincludedir) + +lttngactionincludedir="${includedir}/lttng/action" +AC_SUBST(lttngactionincludedir) + +lttngconditionincludedir="${includedir}/lttng/condition" +AC_SUBST(lttngconditionincludedir) + +lttngnotificationincludedir="${includedir}/lttng/notification" +AC_SUBST(lttngnotificationincludedir) + +lttngtriggerincludedir="${includedir}/lttng/trigger" +AC_SUBST(lttngtriggerincludedir) + AC_SUBST(DEFAULT_INCLUDES) lttnglibexecdir="${libdir}/lttng/libexec" diff --git a/include/Makefile.am b/include/Makefile.am index 0536dcdf0..bc105b064 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -77,10 +77,36 @@ lttnginclude_HEADERS = \ lttng/snapshot.h \ lttng/save.h \ lttng/load.h \ + lttng/endpoint.h \ version.h.tmpl +lttngactioninclude_HEADERS= \ + lttng/action/action.h \ + lttng/action/notify.h + +lttngconditioninclude_HEADERS= \ + lttng/condition/condition.h \ + lttng/condition/buffer-usage.h \ + lttng/condition/evaluation.h + +lttngnotificationinclude_HEADERS= \ + lttng/notification/channel.h \ + lttng/notification/notification.h + +lttngtriggerinclude_HEADERS= \ + lttng/trigger/trigger.h + noinst_HEADERS = \ lttng/snapshot-internal.h \ lttng/health-internal.h \ lttng/save-internal.h \ - lttng/load-internal.h + lttng/load-internal.h \ + lttng/action/action-internal.h \ + lttng/action/notify-internal.h \ + lttng/condition/condition-internal.h \ + lttng/condition/buffer-usage-internal.h \ + lttng/condition/evaluation-internal.h \ + lttng/notification/notification-internal.h \ + lttng/trigger/trigger-internal.h \ + lttng/endpoint-internal.h \ + lttng/notification/channel-internal.h diff --git a/include/lttng/action/action-internal.h b/include/lttng/action/action-internal.h new file mode 100644 index 000000000..9210f9fa3 --- /dev/null +++ b/include/lttng/action/action-internal.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ACTION_INTERNAL_H +#define LTTNG_ACTION_INTERNAL_H + +#include +#include +#include + +typedef bool (*action_validate_cb)(struct lttng_action *action); +typedef void (*action_destroy_cb)(struct lttng_action *action); +typedef ssize_t (*action_serialize_cb)(struct lttng_action *action, char *buf); + +struct lttng_action { + enum lttng_action_type type; + action_validate_cb validate; + action_serialize_cb serialize; + action_destroy_cb destroy; +}; + +struct lttng_action_comm { + /* enum lttng_action_type */ + int8_t action_type; +} LTTNG_PACKED; + +LTTNG_HIDDEN +bool lttng_action_validate(struct lttng_action *action); + +LTTNG_HIDDEN +ssize_t lttng_action_serialize(struct lttng_action *action, char *buf); + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_action_create_from_buffer(const char *buf, + struct lttng_action **action); + +#endif /* LTTNG_ACTION_INTERNAL_H */ diff --git a/include/lttng/action/action.h b/include/lttng/action/action.h new file mode 100644 index 000000000..311f5a0fb --- /dev/null +++ b/include/lttng/action/action.h @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ACTION_H +#define LTTNG_ACTION_H + +struct lttng_action; + +#ifdef __cplusplus +extern "C" { +#endif + +enum lttng_action_type { + LTTNG_ACTION_TYPE_UNKNOWN = -1, + LTTNG_ACTION_TYPE_NOTIFY = 0, +}; + +extern enum lttng_action_type lttng_action_get_type( + struct lttng_action *action); + +extern void lttng_action_destroy(struct lttng_action *action); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_ACTION_H */ diff --git a/include/lttng/action/notify-internal.h b/include/lttng/action/notify-internal.h new file mode 100644 index 000000000..509bd36a9 --- /dev/null +++ b/include/lttng/action/notify-internal.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ACTION_NOTIFY_INTERNAL_H +#define LTTNG_ACTION_NOTIFY_INTERNAL_H + +#include +#include + +struct lttng_action_notify { + struct lttng_action parent; +}; + +#endif /* LTTNG_ACTION_NOTIFY_INTERNAL_H */ diff --git a/include/lttng/action/notify.h b/include/lttng/action/notify.h new file mode 100644 index 000000000..6c0d0ca25 --- /dev/null +++ b/include/lttng/action/notify.h @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ACTION_NOTIFY_H +#define LTTNG_ACTION_NOTIFY_H + +struct lttng_action; + +#ifdef __cplusplus +extern "C" { +#endif + +extern struct lttng_action *lttng_action_notify_create(void); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_ACTION_NOTIFY_H */ diff --git a/include/lttng/condition/buffer-usage-internal.h b/include/lttng/condition/buffer-usage-internal.h new file mode 100644 index 000000000..1052c489f --- /dev/null +++ b/include/lttng/condition/buffer-usage-internal.h @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H +#define LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H + +#include +#include +#include +#include + +struct lttng_condition_buffer_usage { + struct lttng_condition parent; + struct { + bool set; + uint64_t value; + } threshold_bytes; + struct { + bool set; + double value; + } threshold_ratio; + char *session_name; + char *channel_name; + struct { + bool set; + enum lttng_domain_type type; + } domain; +}; + +struct lttng_condition_buffer_usage_comm { + uint8_t threshold_set_in_bytes; + /* + * Expressed in bytes if "threshold_set_in_bytes" is not 0. + * Otherwise, it is expressed a ratio in the interval [0.0, 1.0] + * that is mapped to the range on a 32-bit unsigned integer. + * The ratio is obtained by (threshold / UINT32_MAX). + */ + uint32_t threshold; + /* Both lengths include the trailing \0. */ + uint32_t session_name_len; + uint32_t channel_name_len; + /* enum lttng_domain_type */ + int8_t domain_type; + /* session and channel names. */ + char names[]; +} LTTNG_PACKED; + +struct lttng_evaluation_buffer_usage { + struct lttng_evaluation parent; + uint64_t buffer_use; + uint64_t buffer_capacity; +}; + +struct lttng_evaluation_buffer_usage_comm { + uint64_t buffer_use; + uint64_t buffer_capacity; +} LTTNG_PACKED; + +LTTNG_HIDDEN +struct lttng_evaluation *lttng_evaluation_buffer_usage_create( + enum lttng_condition_type type, uint64_t use, + uint64_t capacity); + +/* + * Applies to all below: + * + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_condition_buffer_usage_low_create_from_buffer(const char *buf, + struct lttng_condition **condition); + +LTTNG_HIDDEN +ssize_t lttng_condition_buffer_usage_high_create_from_buffer(const char *buf, + struct lttng_condition **condition); + +LTTNG_HIDDEN +ssize_t lttng_evaluation_buffer_usage_low_create_from_buffer(const char *buf, + struct lttng_evaluation **evaluation); + +LTTNG_HIDDEN +ssize_t lttng_evaluation_buffer_usage_high_create_from_buffer(const char *buf, + struct lttng_evaluation **evaluation); + + + +#endif /* LTTNG_CONDITION_BUFFER_USAGE_INTERNAL_H */ diff --git a/include/lttng/condition/buffer-usage.h b/include/lttng/condition/buffer-usage.h new file mode 100644 index 000000000..3f5e4d870 --- /dev/null +++ b/include/lttng/condition/buffer-usage.h @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_BUFFER_USAGE_H +#define LTTNG_CONDITION_BUFFER_USAGE_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_condition; +struct lttng_evaluation; + +extern struct lttng_condition * +lttng_condition_buffer_usage_low_create(void); + +extern struct lttng_condition * +lttng_condition_buffer_usage_high_create(void); + +/* threshold_ratio expressed as [0.0, 1.0]. */ +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_threshold_ratio( + struct lttng_condition *condition, + double *threshold_ratio); + +/* threshold_ratio expressed as [0.0, 1.0]. */ +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_threshold_ratio( + struct lttng_condition *condition, + double threshold_ratio); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_threshold( + struct lttng_condition *condition, + uint64_t *threshold_bytes); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_threshold( + struct lttng_condition *condition, + uint64_t threshold_bytes); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_session_name( + struct lttng_condition *condition, + const char **session_name); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_session_name( + struct lttng_condition *condition, + const char *session_name); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_channel_name( + struct lttng_condition *condition, + const char **channel_name); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_channel_name( + struct lttng_condition *condition, + const char *channel_name); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_get_domain_type( + struct lttng_condition *condition, + enum lttng_domain_type *type); + +extern enum lttng_condition_status +lttng_condition_buffer_usage_set_domain_type( + struct lttng_condition *condition, + enum lttng_domain_type type); + + +/* LTTng Condition Evaluation */ +extern enum lttng_evaluation_status +lttng_evaluation_buffer_usage_get_usage_ratio( + struct lttng_evaluation *evaluation, + double *usage_ratio); + +extern enum lttng_evaluation_status +lttng_evaluation_buffer_usage_get_usage( + struct lttng_evaluation *evaluation, + uint64_t *usage_bytes); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_CONDITION_BUFFER_USAGE_H */ diff --git a/include/lttng/condition/condition-internal.h b/include/lttng/condition/condition-internal.h new file mode 100644 index 000000000..66dbf20f7 --- /dev/null +++ b/include/lttng/condition/condition-internal.h @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_INTERNAL_H +#define LTTNG_CONDITION_INTERNAL_H + +#include +#include +#include +#include +#include + +typedef void (*condition_destroy_cb)(struct lttng_condition *condition); +typedef bool (*condition_validate_cb)(struct lttng_condition *condition); +typedef ssize_t (*condition_serialize_cb)(struct lttng_condition *condition, + char *buf); +typedef bool (*condition_equal_cb)(struct lttng_condition *a, + struct lttng_condition *b); + +struct lttng_condition { + enum lttng_condition_type type; + condition_validate_cb validate; + condition_serialize_cb serialize; + condition_equal_cb equal; + condition_destroy_cb destroy; +}; + +struct lttng_condition_comm { + /* enum lttng_condition_type */ + int8_t condition_type; + char payload[]; +}; + +LTTNG_HIDDEN +void lttng_condition_init(struct lttng_condition *condition, + enum lttng_condition_type type); + +LTTNG_HIDDEN +bool lttng_condition_validate(struct lttng_condition *condition); + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_condition_create_from_buffer(const char *buf, + struct lttng_condition **condition); + +LTTNG_HIDDEN +ssize_t lttng_condition_serialize(struct lttng_condition *condition, char *buf); + +LTTNG_HIDDEN +bool lttng_condition_is_equal(struct lttng_condition *a, + struct lttng_condition *b); + +#endif /* LTTNG_CONDITION_INTERNAL_H */ diff --git a/include/lttng/condition/condition.h b/include/lttng/condition/condition.h new file mode 100644 index 000000000..177b4ab83 --- /dev/null +++ b/include/lttng/condition/condition.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_H +#define LTTNG_CONDITION_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_condition; + +enum lttng_condition_type { + LTTNG_CONDITION_TYPE_UNKNOWN = -1, + LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW = 102, + LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH = 101, +}; + +enum lttng_condition_status { + LTTNG_CONDITION_STATUS_OK = 0, + LTTNG_CONDITION_STATUS_ERROR = -1, + LTTNG_CONDITION_STATUS_UNKNOWN = -2, + LTTNG_CONDITION_STATUS_INVALID = -3, + LTTNG_CONDITION_STATUS_UNSET = -4, +}; + +extern enum lttng_condition_type lttng_condition_get_type( + struct lttng_condition *condition); + +extern void lttng_condition_destroy(struct lttng_condition *condition); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_CONDITION_H */ diff --git a/include/lttng/condition/evaluation-internal.h b/include/lttng/condition/evaluation-internal.h new file mode 100644 index 000000000..5c874b3e0 --- /dev/null +++ b/include/lttng/condition/evaluation-internal.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_EVALUATION_INTERNAL_H +#define LTTNG_EVALUATION_INTERNAL_H + +#include +#include +#include + +typedef void (*evaluation_destroy_cb)(struct lttng_evaluation *evaluation); +typedef ssize_t (*evaluation_serialize_cb)(struct lttng_evaluation *evaluation, + char *buf); + +struct lttng_evaluation_comm { + /* enum lttng_condition_type type */ + int8_t type; + char payload[]; +} LTTNG_PACKED; + +struct lttng_evaluation { + enum lttng_condition_type type; + evaluation_serialize_cb serialize; + evaluation_destroy_cb destroy; +}; + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_evaluation_create_from_buffer(const char *buf, + struct lttng_evaluation **evaluation); + +LTTNG_HIDDEN +ssize_t lttng_evaluation_serialize(struct lttng_evaluation *evaluation, + char *buf); + +#endif /* LTTNG_EVALUATION_INTERNAL_H */ diff --git a/include/lttng/condition/evaluation.h b/include/lttng/condition/evaluation.h new file mode 100644 index 000000000..841284cd0 --- /dev/null +++ b/include/lttng/condition/evaluation.h @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_EVALUATION_H +#define LTTNG_EVALUATION_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_evaluation; + +enum lttng_evaluation_status { + LTTNG_EVALUATION_STATUS_OK = 0, + LTTNG_EVALUATION_STATUS_ERROR = -1, + LTTNG_EVALUATION_STATUS_INVALID = -2, + LTTNG_EVALUATION_STATUS_UNKNOWN = -2, + LTTNG_EVALUATION_STATUS_UNSET = -3, +}; + +extern enum lttng_condition_type lttng_evaluation_get_type( + struct lttng_evaluation *evaluation); + +extern void lttng_evaluation_destroy(struct lttng_evaluation *evaluation); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_EVALUATION_H */ diff --git a/include/lttng/endpoint-internal.h b/include/lttng/endpoint-internal.h new file mode 100644 index 000000000..de6024b42 --- /dev/null +++ b/include/lttng/endpoint-internal.h @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ENDPOINT_INTERNAL_H +#define LTTNG_ENDPOINT_INTERNAL_H + +#include +#include + +enum lttng_endpoint_type { + LTTNG_ENDPOINT_TYPE_DEFAULT_SESSIOND_NOTIFICATION = 0, +}; + +struct lttng_endpoint { + enum lttng_endpoint_type type; +}; + +#endif /* LTTNG_ENDPOINT_INTERNAL_H */ diff --git a/include/lttng/endpoint.h b/include/lttng/endpoint.h new file mode 100644 index 000000000..b93ed697e --- /dev/null +++ b/include/lttng/endpoint.h @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ENDPOINT_H +#define LTTNG_ENDPOINT_H + +#ifdef __cplusplus +extern "C" { +#endif + +/* Default LTTng session daemon endpoint singleton. */ +extern struct lttng_endpoint *lttng_session_daemon_notification_endpoint; + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_ENDPOINT_H */ diff --git a/include/lttng/lttng-error.h b/include/lttng/lttng-error.h index db6fe73c2..1b5ea699a 100644 --- a/include/lttng/lttng-error.h +++ b/include/lttng/lttng-error.h @@ -145,6 +145,10 @@ enum lttng_error_code { LTTNG_ERR_REGEN_STATEDUMP_FAIL = 122, /* Failed to regenerate the state dump */ LTTNG_ERR_REGEN_STATEDUMP_NOMEM = 123, /* Failed to regenerate the state dump, not enough memory */ LTTNG_ERR_NOT_SNAPSHOT_SESSION = 124, /* Session is not in snapshot mode. */ + LTTNG_ERR_INVALID_TRIGGER = 125, /* Invalid trigger provided. */ + LTTNG_ERR_TRIGGER_EXISTS = 126, /* Trigger already registered. */ + LTTNG_ERR_TRIGGER_NOT_FOUND = 127, /* Trigger not found. */ + LTTNG_ERR_COMMAND_CANCELLED = 128, /* Command cancelled. */ /* MUST be last element */ LTTNG_ERR_NR, /* Last element */ diff --git a/include/lttng/notification/channel-internal.h b/include/lttng/notification/channel-internal.h new file mode 100644 index 000000000..e684a9633 --- /dev/null +++ b/include/lttng/notification/channel-internal.h @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H +#define LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H + +#include +#include +#include +#include + +enum lttng_notification_channel_message_type { + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE = 0, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE = 1, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY = 2, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION = 3, +}; + +struct lttng_notification_channel_message { + /* enum lttng_notification_channel_message_type */ + int8_t type; + /* size of the payload following this field */ + uint32_t size; + char payload[]; +} LTTNG_PACKED; + +struct lttng_notification_channel_command_reply { + /* enum lttng_notification_channel_status */ + int8_t status; +} LTTNG_PACKED; + +struct lttng_notification_channel { + /* FIXME Add mutex to protect the socket from concurrent uses. */ + int socket; +}; + +#endif /* LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H */ diff --git a/include/lttng/notification/channel.h b/include/lttng/notification/channel.h new file mode 100644 index 000000000..36b3509c7 --- /dev/null +++ b/include/lttng/notification/channel.h @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_NOTIFICATION_CHANNEL_H +#define LTTNG_NOTIFICATION_CHANNEL_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_endpoint; +struct lttng_condition; +struct lttng_notification; +struct lttng_notification_channel; + +/* LTTng Notification channel */ +enum lttng_notification_channel_status { + LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED = 1, + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK = 0, + LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR = -1, + LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED = -2, + LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED = -3, + /* Condition unknown. */ + LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION = -4, + LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID = -5, +}; + +extern struct lttng_notification_channel *lttng_notification_channel_create( + struct lttng_endpoint *endpoint); + +extern enum lttng_notification_channel_status +lttng_notification_channel_get_next_notification( + struct lttng_notification_channel *channel, + struct lttng_notification **notification); + +extern enum lttng_notification_channel_status +lttng_notification_channel_subscribe( + struct lttng_notification_channel *channel, + struct lttng_condition *condition); + +extern enum lttng_notification_channel_status +lttng_notification_channel_unsubscribe( + struct lttng_notification_channel *channel, + struct lttng_condition *condition); + +extern void lttng_notification_channel_destroy( + struct lttng_notification_channel *channel); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_NOTIFICATION_CHANNEL_H */ diff --git a/include/lttng/notification/notification-internal.h b/include/lttng/notification/notification-internal.h new file mode 100644 index 000000000..d59e2e6cf --- /dev/null +++ b/include/lttng/notification/notification-internal.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_NOTIFICATION_INTERNAL_H +#define LTTNG_NOTIFICATION_INTERNAL_H + +#include +#include +#include +#include + +struct lttng_notification { + struct lttng_condition *condition; + struct lttng_evaluation *evaluation; + bool owns_elements; +}; + +struct lttng_notification_comm { + /* length excludes its own length. */ + uint32_t length; + /* A condition and evaluation object follow. */ + char payload[]; +} LTTNG_PACKED; + +LTTNG_HIDDEN +struct lttng_notification *lttng_notification_create( + struct lttng_condition *condition, + struct lttng_evaluation *evaluation); + +LTTNG_HIDDEN +ssize_t lttng_notification_serialize(struct lttng_notification *notification, + char *buf); + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_notification_create_from_buffer(const char *buf, + struct lttng_notification **notification); + +#endif /* LTTNG_NOTIFICATION_INTERNAL_H */ diff --git a/include/lttng/notification/notification.h b/include/lttng/notification/notification.h new file mode 100644 index 000000000..100d0def4 --- /dev/null +++ b/include/lttng/notification/notification.h @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_NOTIFICATION_H +#define LTTNG_NOTIFICATION_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_condition; +struct lttng_evaluation; +struct lttng_notification; + +/* + * The notification retains ownership of both the condition and evaluation. + * Destroying the notification will also destroy the notification and evaluation + * objects. + */ +extern struct lttng_condition *lttng_notification_get_condition( + struct lttng_notification *notification); + +extern struct lttng_evaluation *lttng_notification_get_evaluation( + struct lttng_notification *notification); + +extern void lttng_notification_destroy(struct lttng_notification *notification); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_NOTIFICATION_H */ diff --git a/include/lttng/trigger/trigger-internal.h b/include/lttng/trigger/trigger-internal.h new file mode 100644 index 000000000..f84a334fe --- /dev/null +++ b/include/lttng/trigger/trigger-internal.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_TRIGGER_INTERNAL_H +#define LTTNG_TRIGGER_INTERNAL_H + +#include +#include +#include +#include + +struct lttng_trigger { + struct lttng_condition *condition; + struct lttng_action *action; +}; + +struct lttng_trigger_comm { + /* length excludes its own length. */ + uint32_t length; + /* A condition and action object follow. */ + char payload[]; +} LTTNG_PACKED; + +/* + * FIXME Add explicit buffer bound checking using a "len" parameter to + * ensure malformed buffers are caught and rejected. + */ +LTTNG_HIDDEN +ssize_t lttng_trigger_create_from_buffer(const char *buf, + struct lttng_trigger **trigger); + +LTTNG_HIDDEN +ssize_t lttng_trigger_serialize(struct lttng_trigger *trigger, char *buf); + +LTTNG_HIDDEN +bool lttng_trigger_validate(struct lttng_trigger *trigger); + +#endif /* LTTNG_TRIGGER_INTERNAL_H */ diff --git a/include/lttng/trigger/trigger.h b/include/lttng/trigger/trigger.h new file mode 100644 index 000000000..3d7dd458f --- /dev/null +++ b/include/lttng/trigger/trigger.h @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_TRIGGER_H +#define LTTNG_TRIGGER_H + +struct lttng_action; +struct lttng_condition; +struct lttng_trigger; + +#ifdef __cplusplus +extern "C" { +#endif + +enum lttng_register_trigger_status { + LTTNG_REGISTER_TRIGGER_STATUS_OK = 0, + LTTNG_REGISTER_TRIGGER_STATUS_INVALID = -1, +}; + +/* Trigger assumes ownership of both condition and action after call. */ +extern struct lttng_trigger *lttng_trigger_create( + struct lttng_condition *condition, struct lttng_action *action); + +extern struct lttng_condition *lttng_trigger_get_condition( + struct lttng_trigger *trigger); + +extern struct lttng_action *lttng_trigger_get_action( + struct lttng_trigger *trigger); + +extern void lttng_trigger_destroy(struct lttng_trigger *trigger); + +extern int lttng_register_trigger(struct lttng_trigger *trigger); + +extern int lttng_unregister_trigger(struct lttng_trigger *trigger); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_TRIGGER_H */ diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index 2f8eed108..9fb474753 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -538,7 +538,7 @@ int main(int argc, char **argv) goto exit_data_thread; } - /* Create the thread to manage the receive of fd */ + /* Create the thread to manage the reception of fds */ ret = pthread_create(&sessiond_thread, default_pthread_attr(), consumer_thread_sessiond_poll, (void *) ctx); @@ -583,6 +583,14 @@ exit_metadata_timer_thread: PERROR("pthread_join sessiond_thread"); retval = -1; } + + ret = consumer_timer_thread_get_channel_monitor_pipe(); + if (ret >= 0) { + ret = close(ret); + if (ret) { + PERROR("close channel monitor pipe"); + } + } exit_sessiond_thread: ret = pthread_join(data_thread, &status); diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 423866139..0ac7506b4 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -30,7 +30,10 @@ lttng_sessiond_SOURCES = utils.c utils.h \ agent.c agent.h \ save.h save.c \ load-session-thread.h load-session-thread.c \ - syscall.h syscall.c + syscall.h syscall.c \ + notification-thread.h notification-thread.c \ + notification-thread-commands.h notification-thread-commands.c \ + notification-thread-events.h notification-thread-events.c if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \ diff --git a/src/bin/lttng-sessiond/agent-thread.c b/src/bin/lttng-sessiond/agent-thread.c index 39d2ec4a0..2439b3e28 100644 --- a/src/bin/lttng-sessiond/agent-thread.c +++ b/src/bin/lttng-sessiond/agent-thread.c @@ -252,7 +252,7 @@ void *agent_thread_manage_registration(void *data) goto error_tcp_socket; } - /* Add create valid TCP socket to poll set. */ + /* Add TCP socket to poll set. */ ret = lttng_poll_add(&events, reg_sock->fd, LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index cc81906b6..5c9e69ce1 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -29,6 +29,7 @@ #include #include #include +#include #include "channel.h" #include "consumer.h" @@ -41,6 +42,8 @@ #include "syscall.h" #include "agent.h" #include "buffer-registry.h" +#include "notification-thread.h" +#include "notification-thread-commands.h" #include "cmd.h" @@ -3566,6 +3569,84 @@ end: return ret; } +int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_handle *notification_thread) +{ + int ret; + size_t trigger_len; + ssize_t sock_recv_len; + char *trigger_buffer = NULL; + struct lttng_trigger *trigger = NULL; + + trigger_len = (size_t) cmd_ctx->lsm->u.trigger.length; + trigger_buffer = zmalloc(trigger_len); + if (!trigger_buffer) { + ret = LTTNG_ERR_NOMEM; + goto end; + } + + sock_recv_len = lttcomm_recv_unix_sock(sock, trigger_buffer, + trigger_len); + if (sock_recv_len < 0 || sock_recv_len != trigger_len) { + ERR("Failed to receive \"register trigger\" command payload"); + /* TODO: should this be a new error enum ? */ + ret = LTTNG_ERR_INVALID_TRIGGER; + goto end; + } + + if (lttng_trigger_create_from_buffer(trigger_buffer, &trigger) != + trigger_len) { + ERR("Invalid trigger payload received in \"register trigger\" command"); + ret = LTTNG_ERR_INVALID_TRIGGER; + goto end; + } + + ret = notification_thread_command_register_trigger(notification_thread, + trigger); +end: + free(trigger_buffer); + return ret; +} + +int cmd_unregister_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_handle *notification_thread) +{ + int ret; + size_t trigger_len; + ssize_t sock_recv_len; + char *trigger_buffer = NULL; + struct lttng_trigger *trigger = NULL; + + trigger_len = (size_t) cmd_ctx->lsm->u.trigger.length; + trigger_buffer = zmalloc(trigger_len); + if (!trigger_buffer) { + ret = LTTNG_ERR_NOMEM; + goto end; + } + + sock_recv_len = lttcomm_recv_unix_sock(sock, trigger_buffer, + trigger_len); + if (sock_recv_len < 0 || sock_recv_len != trigger_len) { + ERR("Failed to receive \"register trigger\" command payload"); + /* TODO: should this be a new error enum ? */ + ret = LTTNG_ERR_INVALID_TRIGGER; + goto end; + } + + if (lttng_trigger_create_from_buffer(trigger_buffer, &trigger) != + trigger_len) { + ERR("Invalid trigger payload received in \"register trigger\" command"); + ret = LTTNG_ERR_INVALID_TRIGGER; + goto end; + } + + ret = notification_thread_command_unregister_trigger(notification_thread, + trigger); +end: + free(trigger_buffer); + return ret; +} + /* * Send relayd sockets from snapshot output to consumer. Ignore request if the * snapshot output is *not* set with a remote destination. diff --git a/src/bin/lttng-sessiond/cmd.h b/src/bin/lttng-sessiond/cmd.h index ac88d5130..e7e344276 100644 --- a/src/bin/lttng-sessiond/cmd.h +++ b/src/bin/lttng-sessiond/cmd.h @@ -21,6 +21,8 @@ #include "context.h" #include "session.h" +struct notification_thread_handle; + /* * Init the command subsystem. Must be called before using any of the functions * above. This is called in the main() of the session daemon. @@ -111,4 +113,9 @@ int cmd_set_session_shm_path(struct ltt_session *session, int cmd_regenerate_metadata(struct ltt_session *session); int cmd_regenerate_statedump(struct ltt_session *session); +int cmd_register_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_handle *notification_thread_handle); +int cmd_unregister_trigger(struct command_ctx *cmd_ctx, int sock, + struct notification_thread_handle *notification_thread_handle); + #endif /* CMD_H */ diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 6ee397579..8c8116d8e 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -748,7 +748,6 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) } ret = consumer_recv_status_reply(sock); - error: return ret; } @@ -806,6 +805,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + unsigned int monitor_timer_interval, int output, int type, uint64_t session_id, @@ -837,6 +837,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; msg->u.ask_channel.live_timer_interval = live_timer_interval; + msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval; msg->u.ask_channel.output = output; msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; @@ -1048,6 +1049,35 @@ error: return ret; } +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe) +{ + int ret; + struct lttcomm_consumer_msg msg; + + /* Code flow error. Safety net. */ + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE; + + DBG3("Sending set_channel_monitor_pipe command to consumer"); + ret = consumer_send_msg(consumer_sock, &msg); + if (ret < 0) { + goto error; + } + + DBG3("Sending channel monitoring pipe %d to consumer on socket %d", + pipe, *consumer_sock->fd_ptr); + ret = consumer_send_fds(consumer_sock, &pipe, 1); + if (ret < 0) { + goto error; + } + + DBG2("Channel monitoring pipe successfully sent"); +error: + return ret; +} + /* * Set consumer subdirectory using the session name and a generated datetime if * needed. This is appended to the current subdirectory. diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 08b57eb73..0cc3d0e41 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -93,6 +93,11 @@ struct consumer_data { int err_sock; /* These two sockets uses the cmd_unix_sock_path. */ int cmd_sock; + /* + * Write-end of the channel monitoring pipe to be passed to the + * consumer. + */ + int channel_monitor_pipe; /* * The metadata socket object is handled differently and only created * locally in this object thus it's the only reference available in the @@ -214,6 +219,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, enum lttng_stream_type type, uint64_t session_id, char *session_name, char *hostname, int session_live_timer); +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe); int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer); int consumer_recv_status_reply(struct consumer_socket *sock); @@ -232,6 +239,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + unsigned int monitor_timer_interval, int output, int type, uint64_t session_id, diff --git a/src/bin/lttng-sessiond/health-sessiond.h b/src/bin/lttng-sessiond/health-sessiond.h index 22ea1bb3e..5d94cc639 100644 --- a/src/bin/lttng-sessiond/health-sessiond.h +++ b/src/bin/lttng-sessiond/health-sessiond.h @@ -29,6 +29,7 @@ enum health_type_sessiond { HEALTH_SESSIOND_TYPE_HT_CLEANUP = 5, HEALTH_SESSIOND_TYPE_APP_MANAGE_NOTIFY = 6, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH = 7, + HEALTH_SESSIOND_TYPE_NOTIFICATION = 8, NR_HEALTH_SESSIOND_TYPES, }; diff --git a/src/bin/lttng-sessiond/lttng-sessiond.h b/src/bin/lttng-sessiond/lttng-sessiond.h index 65ab37d59..1988d1fcc 100644 --- a/src/bin/lttng-sessiond/lttng-sessiond.h +++ b/src/bin/lttng-sessiond/lttng-sessiond.h @@ -29,6 +29,7 @@ #include "session.h" #include "ust-app.h" #include "version.h" +#include "notification-thread.h" extern const char default_home_dir[], default_tracing_group[], @@ -38,6 +39,8 @@ extern const char default_home_dir[], /* Set in main.c at boot time of the daemon */ extern int kernel_tracer_fd; +extern struct notification_thread_handle *notification_thread_handle; + /* * This contains extra data needed for processing a command received by the * session daemon from the lttng client. @@ -58,7 +61,7 @@ struct ust_command { }; /* - * Queue used to enqueue UST registration request (ust_commant) and protected + * Queue used to enqueue UST registration request (ust_command) and synchronized * by a futex with a scheme N wakers / 1 waiters. See futex.c/.h */ struct ust_cmd_queue { diff --git a/src/bin/lttng-sessiond/lttng-ust-ctl.h b/src/bin/lttng-sessiond/lttng-ust-ctl.h index a812ee857..cba0e272d 100644 --- a/src/bin/lttng-sessiond/lttng-ust-ctl.h +++ b/src/bin/lttng-sessiond/lttng-ust-ctl.h @@ -215,6 +215,7 @@ int ustctl_put_next_subbuf(struct ustctl_consumer_stream *stream); /* snapshot */ int ustctl_snapshot(struct ustctl_consumer_stream *stream); +int ustctl_snapshot_sample_positions(struct ustctl_consumer_stream *stream); int ustctl_snapshot_get_consumed(struct ustctl_consumer_stream *stream, unsigned long *pos); int ustctl_snapshot_get_produced(struct ustctl_consumer_stream *stream, diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 172f8c270..f137c09f8 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -71,6 +71,7 @@ #include "agent-thread.h" #include "save.h" #include "load-session-thread.h" +#include "notification-thread.h" #include "syscall.h" #include "agent.h" #include "ht-cleanup.h" @@ -104,6 +105,7 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -115,6 +117,7 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -126,6 +129,7 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -211,6 +215,7 @@ static pthread_t health_thread; static pthread_t ht_cleanup_thread; static pthread_t agent_reg_thread; static pthread_t load_session_thread; +static pthread_t notification_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -305,15 +310,19 @@ const char * const config_section_name = "sessiond"; /* Load session thread information to operate. */ struct load_session_thread_data *load_info; +/* Notification thread handle. */ +struct notification_thread_handle *notification_thread_handle; + /* Global hash tables */ struct lttng_ht *agent_apps_ht_by_sock = NULL; /* - * Whether sessiond is ready for commands/health check requests. + * Whether sessiond is ready for commands/notification channel/health check + * requests. * NR_LTTNG_SESSIOND_READY must match the number of calls to * sessiond_notify_ready(). */ -#define NR_LTTNG_SESSIOND_READY 3 +#define NR_LTTNG_SESSIOND_READY 4 int lttng_sessiond_ready = NR_LTTNG_SESSIOND_READY; int sessiond_check_thread_quit_pipe(int fd, uint32_t events) @@ -521,6 +530,24 @@ static void close_consumer_sockets(void) PERROR("UST consumerd64 cmd_sock close"); } } + if (kconsumer_data.channel_monitor_pipe >= 0) { + ret = close(kconsumer_data.channel_monitor_pipe); + if (ret < 0) { + PERROR("kernel consumer channel monitor pipe close"); + } + } + if (ustconsumer32_data.channel_monitor_pipe >= 0) { + ret = close(ustconsumer32_data.channel_monitor_pipe); + if (ret < 0) { + PERROR("UST consumerd32 channel monitor pipe close"); + } + } + if (ustconsumer64_data.channel_monitor_pipe >= 0) { + ret = close(ustconsumer64_data.channel_monitor_pipe); + if (ret < 0) { + PERROR("UST consumerd64 channel monitor pipe close"); + } + } } /* @@ -697,6 +724,10 @@ static void sessiond_cleanup(void) free(load_info); } + if (notification_thread_handle) { + notification_thread_handle_destroy(notification_thread_handle); + } + /* * Cleanup lock file by deleting it and finaly closing it which will * release the file system lock. @@ -1224,6 +1255,7 @@ static void *thread_manage_consumer(void *data) enum lttcomm_return_code code; struct lttng_poll_event events; struct consumer_data *consumer_data = data; + struct consumer_socket *cmd_socket_wrapper = NULL; DBG("[thread] Manage consumer started"); @@ -1333,39 +1365,43 @@ restart: } health_code_update(); - if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { - /* Connect both socket, command and metadata. */ - consumer_data->cmd_sock = - lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - consumer_data->metadata_fd = - lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - if (consumer_data->cmd_sock < 0 - || consumer_data->metadata_fd < 0) { - PERROR("consumer connect cmd socket"); - /* On error, signal condition and quit. */ - signal_consumer_condition(consumer_data, -1); - goto error; - } - consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; - /* Create metadata socket lock. */ - consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); - if (consumer_data->metadata_sock.lock == NULL) { - PERROR("zmalloc pthread mutex"); - goto error; - } - pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); - - signal_consumer_condition(consumer_data, 1); - DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); - DBG("Consumer metadata socket ready (fd: %d)", - consumer_data->metadata_fd); - } else { + if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { ERR("consumer error when waiting for SOCK_READY : %s", lttcomm_get_readable_code(-code)); goto error; } - /* Remove the consumerd error sock since we've established a connexion */ + /* Connect both command and metadata sockets. */ + consumer_data->cmd_sock = + lttcomm_connect_unix_sock( + consumer_data->cmd_unix_sock_path); + consumer_data->metadata_fd = + lttcomm_connect_unix_sock( + consumer_data->cmd_unix_sock_path); + if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) { + PERROR("consumer connect cmd socket"); + /* On error, signal condition and quit. */ + signal_consumer_condition(consumer_data, -1); + goto error; + } + + consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; + + /* Create metadata socket lock. */ + consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); + if (consumer_data->metadata_sock.lock == NULL) { + PERROR("zmalloc pthread mutex"); + goto error; + } + pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + + DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); + DBG("Consumer metadata socket ready (fd: %d)", + consumer_data->metadata_fd); + + /* + * Remove the consumerd error sock since we've established a connection. + */ ret = lttng_poll_del(&events, consumer_data->err_sock); if (ret < 0) { goto error; @@ -1386,6 +1422,27 @@ restart: health_code_update(); + /* + * Transfer the write-end of the channel monitoring pipe to the + * by issuing a SET_CHANNEL_MONITOR_PIPE command. + */ + cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock); + if (!cmd_socket_wrapper) { + goto error; + } + + ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper, + consumer_data->channel_monitor_pipe); + if (ret) { + goto error; + } + /* Discard the socket wrapper as it is no longer needed. */ + consumer_destroy_socket(cmd_socket_wrapper); + cmd_socket_wrapper = NULL; + + /* The thread is completely initialized, signal that it is ready. */ + signal_consumer_condition(consumer_data, 1); + /* Infinite blocking call, waiting for transmission */ restart_poll: while (1) { @@ -1529,6 +1586,10 @@ error: free(consumer_data->metadata_sock.lock); } lttng_poll_clean(&events); + + if (cmd_socket_wrapper) { + consumer_destroy_socket(cmd_socket_wrapper); + } error_poll: if (err) { health_error(); @@ -3004,6 +3065,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, case LTTNG_SET_SESSION_SHM_PATH: case LTTNG_REGENERATE_METADATA: case LTTNG_REGENERATE_STATEDUMP: + case LTTNG_REGISTER_TRIGGER: + case LTTNG_UNREGISTER_TRIGGER: need_domain = 0; break; default: @@ -3066,6 +3129,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, case LTTNG_LIST_SYSCALLS: case LTTNG_LIST_TRACEPOINT_FIELDS: case LTTNG_SAVE_SESSION: + case LTTNG_REGISTER_TRIGGER: + case LTTNG_UNREGISTER_TRIGGER: need_tracing_session = 0; break; default: @@ -4117,6 +4182,18 @@ error_add_context: ret = cmd_regenerate_statedump(cmd_ctx->session); break; } + case LTTNG_REGISTER_TRIGGER: + { + ret = cmd_register_trigger(cmd_ctx, sock, + notification_thread_handle); + break; + } + case LTTNG_UNREGISTER_TRIGGER: + { + ret = cmd_unregister_trigger(cmd_ctx, sock, + notification_thread_handle); + break; + } default: ret = LTTNG_ERR_UND; break; @@ -5531,6 +5608,9 @@ int main(int argc, char **argv) int ret = 0, retval = 0; void *status; const char *home_path, *env_app_timeout; + struct lttng_pipe *ust32_channel_monitor_pipe = NULL, + *ust64_channel_monitor_pipe = NULL, + *kernel_channel_monitor_pipe = NULL; init_kernel_workarounds(); @@ -5689,6 +5769,19 @@ int main(int argc, char **argv) kconsumer_data.err_unix_sock_path); DBG2("Kernel consumer cmd path: %s", kconsumer_data.cmd_unix_sock_path); + kernel_channel_monitor_pipe = lttng_pipe_open(0); + if (!kernel_channel_monitor_pipe) { + ERR("Failed to create kernel consumer channel monitor pipe"); + retval = -1; + goto exit_init_data; + } + kconsumer_data.channel_monitor_pipe = + lttng_pipe_release_writefd( + kernel_channel_monitor_pipe); + if (kconsumer_data.channel_monitor_pipe < 0) { + retval = -1; + goto exit_init_data; + } } else { home_path = utils_get_home_dir(); if (home_path == NULL) { @@ -5793,6 +5886,18 @@ int main(int argc, char **argv) ustconsumer32_data.err_unix_sock_path); DBG2("UST consumer 32 bits cmd path: %s", ustconsumer32_data.cmd_unix_sock_path); + ust32_channel_monitor_pipe = lttng_pipe_open(0); + if (!ust32_channel_monitor_pipe) { + ERR("Failed to create 32-bit user space consumer channel monitor pipe"); + retval = -1; + goto exit_init_data; + } + ustconsumer32_data.channel_monitor_pipe = lttng_pipe_release_writefd( + ust32_channel_monitor_pipe); + if (ustconsumer32_data.channel_monitor_pipe < 0) { + retval = -1; + goto exit_init_data; + } /* 64 bits consumerd path setup */ ret = snprintf(ustconsumer64_data.err_unix_sock_path, PATH_MAX, @@ -5814,6 +5919,18 @@ int main(int argc, char **argv) ustconsumer64_data.err_unix_sock_path); DBG2("UST consumer 64 bits cmd path: %s", ustconsumer64_data.cmd_unix_sock_path); + ust64_channel_monitor_pipe = lttng_pipe_open(0); + if (!ust64_channel_monitor_pipe) { + ERR("Failed to create 64-bit user space consumer channel monitor pipe"); + retval = -1; + goto exit_init_data; + } + ustconsumer64_data.channel_monitor_pipe = lttng_pipe_release_writefd( + ust64_channel_monitor_pipe); + if (ustconsumer64_data.channel_monitor_pipe < 0) { + retval = -1; + goto exit_init_data; + } /* * See if daemon already exist. @@ -5973,7 +6090,7 @@ int main(int argc, char **argv) } load_info->path = opt_load_session_path; - /* Create health-check thread */ + /* Create health-check thread. */ ret = pthread_create(&health_thread, default_pthread_attr(), thread_manage_health, (void *) NULL); if (ret) { @@ -5983,6 +6100,29 @@ int main(int argc, char **argv) goto exit_health; } + /* notification_thread_data acquires the pipes' read side. */ + notification_thread_handle = notification_thread_handle_create( + ust32_channel_monitor_pipe, + ust64_channel_monitor_pipe, + kernel_channel_monitor_pipe); + if (!notification_thread_handle) { + retval = -1; + ERR("Failed to create notification thread shared data"); + stop_threads(); + goto exit_notification; + } + + /* Create notification thread. */ + ret = pthread_create(¬ification_thread, default_pthread_attr(), + thread_notification, notification_thread_handle); + if (ret) { + errno = ret; + PERROR("pthread_create notification"); + retval = -1; + stop_threads(); + goto exit_notification; + } + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, default_pthread_attr(), thread_manage_clients, (void *) NULL); @@ -5990,6 +6130,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create clients"); retval = -1; + stop_threads(); goto exit_client; } @@ -6000,6 +6141,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create dispatch"); retval = -1; + stop_threads(); goto exit_dispatch; } @@ -6010,6 +6152,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create registration"); retval = -1; + stop_threads(); goto exit_reg_apps; } @@ -6020,6 +6163,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create apps"); retval = -1; + stop_threads(); goto exit_apps; } @@ -6030,6 +6174,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create notify"); retval = -1; + stop_threads(); goto exit_apps_notify; } @@ -6040,6 +6185,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create agent"); retval = -1; + stop_threads(); goto exit_agent_reg; } @@ -6052,6 +6198,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create kernel"); retval = -1; + stop_threads(); goto exit_kernel; } } @@ -6063,6 +6210,7 @@ int main(int argc, char **argv) errno = ret; PERROR("pthread_create load_session_thread"); retval = -1; + stop_threads(); goto exit_load_session; } @@ -6139,16 +6287,24 @@ exit_dispatch: PERROR("pthread_join"); retval = -1; } + exit_client: + ret = pthread_join(notification_thread, &status); + if (ret) { + errno = ret; + PERROR("pthread_join notification thread"); + retval = -1; + } +exit_notification: ret = pthread_join(health_thread, &status); if (ret) { errno = ret; PERROR("pthread_join health thread"); retval = -1; } -exit_health: +exit_health: exit_init_data: /* * Wait for all pending call_rcu work to complete before tearing @@ -6176,6 +6332,9 @@ exit_init_data: if (ret) { retval = -1; } + lttng_pipe_destroy(ust32_channel_monitor_pipe); + lttng_pipe_destroy(ust64_channel_monitor_pipe); + lttng_pipe_destroy(kernel_channel_monitor_pipe); exit_ht_cleanup: health_app_destroy(health_sessiond); @@ -6186,7 +6345,6 @@ exit_options: sessiond_cleanup_options(); exit_set_signal_handler: - if (!retval) { exit(EXIT_SUCCESS); } else { diff --git a/src/bin/lttng-sessiond/notification-thread-commands.c b/src/bin/lttng-sessiond/notification-thread-commands.c new file mode 100644 index 000000000..cfbb86210 --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread-commands.c @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include +#include "notification-thread.h" +#include "notification-thread-commands.h" +#include +#include +#include +#include +#include + +static +void init_notification_thread_command(struct notification_thread_command *cmd) +{ + memset(cmd, 0, sizeof(*cmd)); + CDS_INIT_LIST_HEAD(&cmd->cmd_list_node); +} + +static +int run_command_wait(struct notification_thread_handle *handle, + struct notification_thread_command *cmd) +{ + int ret; + uint64_t notification_counter = 1; + + futex_nto1_prepare(&cmd->reply_futex); + + pthread_mutex_lock(&handle->cmd_queue.lock); + /* Add to queue. */ + cds_list_add_tail(&cmd->cmd_list_node, + &handle->cmd_queue.list); + /* Wake-up thread. */ + ret = write(handle->cmd_queue.event_fd, + ¬ification_counter, sizeof(notification_counter)); + if (ret < 0) { + PERROR("write to notification thread's queue event fd"); + /* + * Remove the command from the list so the notification + * thread does not process it. + */ + cds_list_del(&cmd->cmd_list_node); + goto error_unlock_queue; + } + pthread_mutex_unlock(&handle->cmd_queue.lock); + + futex_nto1_wait(&cmd->reply_futex); + return 0; +error_unlock_queue: + pthread_mutex_unlock(&handle->cmd_queue.lock); + return -1; +} + +enum lttng_error_code notification_thread_command_register_trigger( + struct notification_thread_handle *handle, + struct lttng_trigger *trigger) +{ + int ret; + enum lttng_error_code ret_code; + struct notification_thread_command cmd; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER; + cmd.parameters.trigger = trigger; + + ret = run_command_wait(handle, &cmd); + if (ret) { + ret_code = LTTNG_ERR_UNK; + goto end; + } + ret_code = cmd.reply_code; +end: + return ret_code; +} + +enum lttng_error_code notification_thread_command_unregister_trigger( + struct notification_thread_handle *handle, + struct lttng_trigger *trigger) +{ + int ret; + enum lttng_error_code ret_code; + struct notification_thread_command cmd; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER; + cmd.parameters.trigger = trigger; + + ret = run_command_wait(handle, &cmd); + if (ret) { + ret_code = LTTNG_ERR_UNK; + goto end; + } + ret_code = cmd.reply_code; +end: + return ret_code; +} + +enum lttng_error_code notification_thread_command_add_channel( + struct notification_thread_handle *handle, + char *session_name, uid_t uid, gid_t gid, + char *channel_name, uint64_t key, + enum lttng_domain_type domain, uint64_t capacity) +{ + int ret; + enum lttng_error_code ret_code; + struct notification_thread_command cmd; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL; + cmd.parameters.add_channel.session_name = session_name; + cmd.parameters.add_channel.uid = uid; + cmd.parameters.add_channel.gid = gid; + cmd.parameters.add_channel.channel_name = channel_name; + cmd.parameters.add_channel.key.key = key; + cmd.parameters.add_channel.key.domain = domain; + cmd.parameters.add_channel.capacity = capacity; + + ret = run_command_wait(handle, &cmd); + if (ret) { + ret_code = LTTNG_ERR_UNK; + goto end; + } + ret_code = cmd.reply_code; +end: + return ret_code; +} + +enum lttng_error_code notification_thread_command_remove_channel( + struct notification_thread_handle *handle, + uint64_t key, enum lttng_domain_type domain) +{ + int ret; + enum lttng_error_code ret_code; + struct notification_thread_command cmd; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL; + cmd.parameters.remove_channel.key = key; + cmd.parameters.remove_channel.domain = domain; + + ret = run_command_wait(handle, &cmd); + if (ret) { + ret_code = LTTNG_ERR_UNK; + goto end; + } + ret_code = cmd.reply_code; +end: + return ret_code; +} diff --git a/src/bin/lttng-sessiond/notification-thread-commands.h b/src/bin/lttng-sessiond/notification-thread-commands.h new file mode 100644 index 000000000..463aa5e41 --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread-commands.h @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef NOTIFICATION_THREAD_COMMANDS_H +#define NOTIFICATION_THREAD_COMMANDS_H + +#include +#include +#include + +struct notification_thread_data; +struct lttng_trigger; + +enum notification_thread_command_type { + NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER, + NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER, + NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL, + NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL, +}; + +struct channel_key { + uint64_t key; + enum lttng_domain_type domain; +}; + +struct channel_info { + struct channel_key key; + char *session_name; + uid_t uid; + gid_t gid; + char *channel_name; + uint64_t capacity; + struct cds_lfht_node channels_ht_node; +}; + +struct notification_thread_command { + struct cds_list_head cmd_list_node; + + enum notification_thread_command_type type; + union { + /* Register/Unregister trigger. */ + struct lttng_trigger *trigger; + /* Add channel. */ + struct channel_info add_channel; + /* Remove channel. */ + struct { + uint64_t key; + enum lttng_domain_type domain; + } remove_channel; + } parameters; + + /* Futex on which to wait for command reply (optional). */ + int32_t reply_futex; + enum lttng_error_code reply_code; +}; + +enum lttng_error_code notification_thread_command_register_trigger( + struct notification_thread_handle *handle, + struct lttng_trigger *trigger); + +enum lttng_error_code notification_thread_command_unregister_trigger( + struct notification_thread_handle *handle, + struct lttng_trigger *trigger); + +enum lttng_error_code notification_thread_command_add_channel( + struct notification_thread_handle *handle, + char *session_name, uid_t uid, gid_t gid, + char *channel_name, uint64_t key, + enum lttng_domain_type domain, uint64_t capacity); + +enum lttng_error_code notification_thread_command_remove_channel( + struct notification_thread_handle *handle, + uint64_t key, enum lttng_domain_type domain); + +#endif /* NOTIFICATION_THREAD_COMMANDS_H */ diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c new file mode 100644 index 000000000..222e2faa6 --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -0,0 +1,1663 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _LGPL_SOURCE +#include +#include + +#include "notification-thread.h" +#include "notification-thread-events.h" +#include "notification-thread-commands.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct lttng_trigger_list_element { + struct lttng_trigger *trigger; + struct cds_list_head node; +}; + +struct lttng_channel_trigger_list { + struct channel_key channel_key; + struct cds_list_head list; + struct cds_lfht_node channel_triggers_ht_node; +}; + +struct lttng_trigger_ht_element { + struct lttng_trigger *trigger; + struct cds_lfht_node node; +}; + +struct lttng_condition_list_element { + struct lttng_condition *condition; + struct cds_list_head node; +}; + +struct notification_client_list_element { + struct notification_client *client; + struct cds_list_head node; +}; + +struct notification_client_list { + struct lttng_trigger *trigger; + struct cds_list_head list; + struct cds_lfht_node notification_trigger_ht_node; +}; + +struct notification_client { + int socket; + uid_t uid; + gid_t gid; + /* + * Conditions to which the client's notification channel is subscribed. + * List of struct lttng_condition_list_node. The condition member is + * owned by the client. + */ + struct cds_list_head condition_list; + struct cds_lfht_node client_socket_ht_node; +}; + +struct channel_state_sample { + struct channel_key key; + struct cds_lfht_node channel_state_ht_node; + uint64_t highest_usage; + uint64_t lowest_usage; +}; + +static +int match_client(struct cds_lfht_node *node, const void *key) +{ + int socket = (int) key; + struct notification_client *client; + + client = caa_container_of(node, struct notification_client, + client_socket_ht_node); + + return !!(client->socket == socket); +} + +static +int match_channel_trigger_list(struct cds_lfht_node *node, const void *key) +{ + struct channel_key *channel_key = (struct channel_key *) key; + struct lttng_channel_trigger_list *trigger_list; + + trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, + channel_triggers_ht_node); + + return !!((channel_key->key == trigger_list->channel_key.key) && + (channel_key->domain == trigger_list->channel_key.domain)); +} + +static +int match_channel_state_sample(struct cds_lfht_node *node, const void *key) +{ + struct channel_key *channel_key = (struct channel_key *) key; + struct channel_state_sample *sample; + + sample = caa_container_of(node, struct channel_state_sample, + channel_state_ht_node); + + return !!((channel_key->key == sample->key.key) && + (channel_key->domain == sample->key.domain)); +} + +static +int match_channel_info(struct cds_lfht_node *node, const void *key) +{ + struct channel_key *channel_key = (struct channel_key *) key; + struct channel_info *channel_info; + + channel_info = caa_container_of(node, struct channel_info, + channels_ht_node); + + return !!((channel_key->key == channel_info->key.key) && + (channel_key->domain == channel_info->key.domain)); +} + +static +int match_condition(struct cds_lfht_node *node, const void *key) +{ + struct lttng_condition *condition_key = (struct lttng_condition *) key; + struct lttng_trigger_ht_element *trigger; + struct lttng_condition *condition; + + trigger = caa_container_of(node, struct lttng_trigger_ht_element, + node); + condition = lttng_trigger_get_condition(trigger->trigger); + assert(condition); + + return !!lttng_condition_is_equal(condition_key, condition); +} + +static +int match_client_list(struct cds_lfht_node *node, const void *key) +{ + struct lttng_trigger *trigger_key = (struct lttng_trigger *) key; + struct notification_client_list *client_list; + struct lttng_condition *condition; + struct lttng_condition *condition_key = lttng_trigger_get_condition( + trigger_key); + + assert(condition_key); + + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + condition = lttng_trigger_get_condition(client_list->trigger); + + return !!lttng_condition_is_equal(condition_key, condition); +} + +static +int match_client_list_condition(struct cds_lfht_node *node, const void *key) +{ + struct lttng_condition *condition_key = (struct lttng_condition *) key; + struct notification_client_list *client_list; + struct lttng_condition *condition; + + assert(condition_key); + + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + condition = lttng_trigger_get_condition(client_list->trigger); + + return !!lttng_condition_is_equal(condition_key, condition); +} + +static +unsigned long lttng_condition_buffer_usage_hash( + struct lttng_condition *_condition) +{ + unsigned long hash = 0; + struct lttng_condition_buffer_usage *condition; + + condition = container_of(_condition, + struct lttng_condition_buffer_usage, parent); + + if (condition->session_name) { + hash ^= hash_key_str(condition->session_name, lttng_ht_seed); + } + if (condition->channel_name) { + hash ^= hash_key_str(condition->session_name, lttng_ht_seed); + } + if (condition->domain.set) { + hash ^= hash_key_ulong( + (void *) condition->domain.type, + lttng_ht_seed); + } + if (condition->threshold_ratio.set) { + uint64_t val; + + val = condition->threshold_ratio.value * (double) UINT32_MAX; + hash ^= hash_key_u64(&val, lttng_ht_seed); + } else if (condition->threshold_ratio.set) { + uint64_t val; + + val = condition->threshold_bytes.value; + hash ^= hash_key_u64(&val, lttng_ht_seed); + } + return hash; +} + +/* + * The lttng_condition hashing code is kept in this file (rather than + * condition.c) since it makes use of GPLv2 code (hashtable utils), which we + * don't want to link in liblttng-ctl. + */ +static +unsigned long lttng_condition_hash(struct lttng_condition *condition) +{ + switch (condition->type) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + return lttng_condition_buffer_usage_hash(condition); + default: + ERR("[notification-thread] Unexpected condition type caught"); + abort(); + } +} + +static +void channel_info_destroy(struct channel_info *channel_info) +{ + if (!channel_info) { + return; + } + + if (channel_info->session_name) { + free(channel_info->session_name); + } + if (channel_info->channel_name) { + free(channel_info->channel_name); + } + free(channel_info); +} + +static +struct channel_info *channel_info_copy(struct channel_info *channel_info) +{ + struct channel_info *copy = zmalloc(sizeof(*channel_info)); + + assert(channel_info); + assert(channel_info->session_name); + assert(channel_info->channel_name); + + if (!copy) { + goto end; + } + + memcpy(copy, channel_info, sizeof(*channel_info)); + copy->session_name = NULL; + copy->channel_name = NULL; + + copy->session_name = strdup(channel_info->session_name); + if (!copy->session_name) { + goto error; + } + copy->channel_name = strdup(channel_info->channel_name); + if (!copy->channel_name) { + goto error; + } + cds_lfht_node_init(&channel_info->channels_ht_node); +end: + return copy; +error: + channel_info_destroy(copy); + return NULL; +} + +static +int notification_thread_client_subscribe(struct notification_client *client, + struct lttng_condition *condition, + struct notification_thread_state *state, + enum lttng_notification_channel_status *_status) +{ + int ret = 0; + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + struct notification_client_list *client_list; + struct lttng_condition_list_element *condition_list_element = NULL; + struct notification_client_list_element *client_list_element = NULL; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + + /* + * Ensure that the client has not already subscribed to this condition + * before. + */ + cds_list_for_each_entry(condition_list_element, &client->condition_list, node) { + if (lttng_condition_is_equal(condition_list_element->condition, + condition)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED; + goto end; + } + } + + condition_list_element = zmalloc(sizeof(*condition_list_element)); + if (!condition_list_element) { + ret = -1; + goto error; + } + client_list_element = zmalloc(sizeof(*client_list_element)); + if (!client_list_element) { + ret = -1; + goto error; + } + + rcu_read_lock(); + + /* + * Add the newly-subscribed condition to the client's subscription list. + */ + CDS_INIT_LIST_HEAD(&condition_list_element->node); + condition_list_element->condition = condition; + cds_list_add(&condition_list_element->node, &client->condition_list); + + /* + * Add the client to the list of clients interested in a given trigger + * if a "notification" trigger with a corresponding condition was + * added prior. + */ + cds_lfht_lookup(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + match_client_list_condition, + condition, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end_unlock; + } + + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + client_list_element->client = client; + CDS_INIT_LIST_HEAD(&client_list_element->node); + cds_list_add(&client_list->list, &client_list_element->node); +end_unlock: + rcu_read_unlock(); +end: + if (_status) { + *_status = status; + } + return ret; +error: + free(condition_list_element); + free(client_list_element); + return ret; +} + +static +int notification_thread_client_unsubscribe( + struct notification_client *client, + struct lttng_condition *condition, + struct notification_thread_state *state, + enum lttng_notification_channel_status *_status) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + struct notification_client_list *client_list; + struct lttng_condition_list_element *condition_list_element, + *condition_tmp; + struct notification_client_list_element *client_list_element, + *client_tmp; + bool condition_found = false; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + + /* Remove the condition from the client's condition list. */ + cds_list_for_each_entry_safe(condition_list_element, condition_tmp, + &client->condition_list, node) { + if (!lttng_condition_is_equal(condition_list_element->condition, + condition)) { + continue; + } + + cds_list_del(&condition_list_element->node); + /* + * The caller may be iterating on the client's conditions to + * tear down a client's connection. In this case, the condition + * will be destroyed at the end. + */ + if (condition != condition_list_element->condition) { + lttng_condition_destroy( + condition_list_element->condition); + } + free(condition_list_element); + condition_found = true; + break; + } + + if (!condition_found) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION; + goto end; + } + + /* + * Remove the client from the list of clients interested the trigger + * matching the condition. + */ + rcu_read_lock(); + cds_lfht_lookup(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + match_client_list_condition, + condition, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end_unlock; + } + + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + cds_list_for_each_entry_safe(client_list_element, client_tmp, + &client_list->list, node) { + if (client_list_element->client->socket != client->socket) { + continue; + } + cds_list_del(&client_list_element->node); + free(client_list_element); + break; + } +end_unlock: + rcu_read_unlock(); +end: + lttng_condition_destroy(condition); + if (_status) { + *_status = status; + } + return 0; +} + +static +void notification_client_destroy(struct notification_client *client, + struct notification_thread_state *state) +{ + struct lttng_condition_list_element *condition_list_element, *tmp; + + if (!client) { + return; + } + + /* Release all conditions to which the client was subscribed. */ + cds_list_for_each_entry_safe(condition_list_element, tmp, + &client->condition_list, node) { + (void) notification_thread_client_unsubscribe(client, + condition_list_element->condition, state, NULL); + } + + if (client->socket >= 0) { + (void) lttcomm_close_unix_sock(client->socket); + } + free(client); +} + +/* + * Call with rcu_read_lock held (and hold for the lifetime of the returned + * client pointer). + */ +static +struct notification_client *get_client_from_socket(int socket, + struct notification_thread_state *state) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + struct notification_client *client = NULL; + + cds_lfht_lookup(state->client_socket_ht, + hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed), + match_client, + (void *) (unsigned long) socket, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end; + } + + client = caa_container_of(node, struct notification_client, + client_socket_ht_node); +end: + return client; +} + +static +bool trigger_applies_to_channel(struct lttng_trigger *trigger, + struct channel_info *info) +{ + enum lttng_condition_status status; + struct lttng_condition *condition; + const char *trigger_session_name = NULL; + const char *trigger_channel_name = NULL; + enum lttng_domain_type trigger_domain; + + condition = lttng_trigger_get_condition(trigger); + if (!condition) { + goto fail; + } + + switch (lttng_condition_get_type(condition)) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + break; + default: + goto fail; + } + + status = lttng_condition_buffer_usage_get_domain_type(condition, + &trigger_domain); + assert(status == LTTNG_CONDITION_STATUS_OK); + if (info->key.domain != trigger_domain) { + goto fail; + } + + status = lttng_condition_buffer_usage_get_session_name( + condition, &trigger_session_name); + assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name); + + status = lttng_condition_buffer_usage_get_channel_name( + condition, &trigger_channel_name); + assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name); + + if (strcmp(info->session_name, trigger_session_name)) { + goto fail; + } + if (strcmp(info->channel_name, trigger_channel_name)) { + goto fail; + } + + return true; +fail: + return false; +} + +static +bool trigger_applies_to_client(struct lttng_trigger *trigger, + struct notification_client *client) +{ + bool applies = false; + struct lttng_condition_list_element *condition_list_element; + + cds_list_for_each_entry(condition_list_element, &client->condition_list, + node) { + applies = lttng_condition_is_equal( + condition_list_element->condition, + lttng_trigger_get_condition(trigger)); + if (applies) { + break; + } + } + return applies; +} + +static +unsigned long hash_channel_key(struct channel_key *key) +{ + return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong( + (void *) (unsigned long) key->domain, lttng_ht_seed); +} + +static +int handle_notification_thread_command_add_channel( + struct notification_thread_state *state, + struct channel_info *channel_info, + enum lttng_error_code *cmd_result) +{ + struct cds_list_head trigger_list; + struct channel_info *new_channel_info; + struct channel_key *channel_key; + struct lttng_channel_trigger_list *channel_trigger_list = NULL; + struct lttng_trigger_ht_element *trigger_ht_element = NULL; + int trigger_count = 0; + struct cds_lfht_iter iter; + + DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain", + channel_info->channel_name, channel_info->session_name, + channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space"); + + CDS_INIT_LIST_HEAD(&trigger_list); + + new_channel_info = channel_info_copy(channel_info); + if (!new_channel_info) { + goto error; + } + + channel_key = &new_channel_info->key; + + /* Build a list of all triggers applying to the new channel. */ + cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element, + node) { + struct lttng_trigger_list_element *new_element; + + if (!trigger_applies_to_channel(trigger_ht_element->trigger, + channel_info)) { + continue; + } + + new_element = zmalloc(sizeof(*new_element)); + if (!new_element) { + goto error; + } + CDS_INIT_LIST_HEAD(&new_element->node); + new_element->trigger = trigger_ht_element->trigger; + cds_list_add(&new_element->node, &trigger_list); + trigger_count++; + } + + DBG("[notification-thread] Found %i triggers that apply to newly added channel", + trigger_count); + channel_trigger_list = zmalloc(sizeof(*channel_trigger_list)); + if (!channel_trigger_list) { + goto error; + } + channel_trigger_list->channel_key = *channel_key; + CDS_INIT_LIST_HEAD(&channel_trigger_list->list); + cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node); + cds_list_splice(&trigger_list, &channel_trigger_list->list); + + rcu_read_lock(); + /* Add channel to the channel_ht which owns the channel_infos. */ + cds_lfht_add(state->channels_ht, + hash_channel_key(channel_key), + &new_channel_info->channels_ht_node); + /* + * Add the list of triggers associated with this channel to the + * channel_triggers_ht. + */ + cds_lfht_add(state->channel_triggers_ht, + hash_channel_key(channel_key), + &channel_trigger_list->channel_triggers_ht_node); + rcu_read_unlock(); + *cmd_result = LTTNG_OK; + return 0; +error: + /* Empty trigger list */ + channel_info_destroy(new_channel_info); + return 1; +} + +static +int handle_notification_thread_command_remove_channel( + struct notification_thread_state *state, + uint64_t channel_key, enum lttng_domain_type domain, + enum lttng_error_code *cmd_result) +{ + struct cds_lfht_node *node; + struct cds_lfht_iter iter; + struct lttng_channel_trigger_list *trigger_list; + struct lttng_trigger_list_element *trigger_list_element, *tmp; + struct channel_key key = { .key = channel_key, .domain = domain }; + struct channel_info *channel_info; + + DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain", + channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space"); + + rcu_read_lock(); + + cds_lfht_lookup(state->channel_triggers_ht, + hash_channel_key(&key), + match_channel_trigger_list, + &key, + &iter); + node = cds_lfht_iter_get_node(&iter); + /* + * There is a severe internal error if we are being asked to remove a + * channel that doesn't exist. + */ + if (!node) { + ERR("[notification-thread] Channel being removed is unknown to the notification thread"); + goto end; + } + + /* Free the list of triggers associated with this channel. */ + trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, + channel_triggers_ht_node); + cds_list_for_each_entry_safe(trigger_list_element, tmp, + &trigger_list->list, node) { + cds_list_del(&trigger_list_element->node); + free(trigger_list_element); + } + cds_lfht_del(state->channel_triggers_ht, node); + free(trigger_list); + + /* Free sampled channel state. */ + cds_lfht_lookup(state->channel_state_ht, + hash_channel_key(&key), + match_channel_state_sample, + &key, + &iter); + node = cds_lfht_iter_get_node(&iter); + /* + * This is expected to be NULL if the channel is destroyed before we + * received a sample. + */ + if (node) { + struct channel_state_sample *sample = caa_container_of(node, + struct channel_state_sample, + channel_state_ht_node); + + cds_lfht_del(state->channel_state_ht, node); + free(sample); + } + + /* Remove the channel from the channels_ht and free it. */ + cds_lfht_lookup(state->channels_ht, + hash_channel_key(&key), + match_channel_info, + &key, + &iter); + node = cds_lfht_iter_get_node(&iter); + assert(node); + channel_info = caa_container_of(node, struct channel_info, + channels_ht_node); + cds_lfht_del(state->channels_ht, node); + channel_info_destroy(channel_info); +end: + rcu_read_unlock(); + *cmd_result = LTTNG_OK; + return 0; +} + +/* + * FIXME A client's credentials are not checked when registering a trigger, nor + * are they stored alongside with the trigger. + * + * The effects of this are benign: + * - The client will succeed in registering the trigger, as it is valid, + * - The trigger will, internally, be bound to the channel, + * - The notifications will not be sent since the client's credentials + * are checked against the channel at that moment. + */ +static +int handle_notification_thread_command_register_trigger( + struct notification_thread_state *state, + struct lttng_trigger *trigger, + enum lttng_error_code *cmd_result) +{ + int ret = 0; + struct lttng_condition *condition; + struct notification_client *client; + struct notification_client_list *client_list = NULL; + struct lttng_trigger_ht_element *trigger_ht_element = NULL; + struct notification_client_list_element *client_list_element, *tmp; + struct cds_lfht_node *node; + struct cds_lfht_iter iter; + struct channel_info *channel; + bool free_trigger = true; + + rcu_read_lock(); + + condition = lttng_trigger_get_condition(trigger); + trigger_ht_element = zmalloc(sizeof(*trigger_ht_element)); + if (!trigger_ht_element) { + ret = -1; + goto error; + } + + /* Add trigger to the trigger_ht. */ + cds_lfht_node_init(&trigger_ht_element->node); + trigger_ht_element->trigger = trigger; + + node = cds_lfht_add_unique(state->triggers_ht, + lttng_condition_hash(condition), + match_condition, + condition, + &trigger_ht_element->node); + if (node != &trigger_ht_element->node) { + /* Not a fatal error, simply report it to the client. */ + *cmd_result = LTTNG_ERR_TRIGGER_EXISTS; + goto error_free_ht_element; + } + + /* + * Ownership of the trigger and of its wrapper was transfered to + * the triggers_ht. + */ + trigger_ht_element = NULL; + free_trigger = false; + + /* + * The rest only applies to triggers that have a "notify" action. + * It is not skipped as this is the only action type currently + * supported. + */ + client_list = zmalloc(sizeof(*client_list)); + if (!client_list) { + ret = -1; + goto error_free_ht_element; + } + cds_lfht_node_init(&client_list->notification_trigger_ht_node); + CDS_INIT_LIST_HEAD(&client_list->list); + client_list->trigger = trigger; + + /* Build a list of clients to which this new trigger applies. */ + cds_lfht_for_each_entry(state->client_socket_ht, &iter, client, + client_socket_ht_node) { + if (!trigger_applies_to_client(trigger, client)) { + continue; + } + + client_list_element = zmalloc(sizeof(*client_list_element)); + if (!client_list_element) { + ret = -1; + goto error_free_client_list; + } + CDS_INIT_LIST_HEAD(&client_list_element->node); + client_list_element->client = client; + cds_list_add(&client_list_element->node, &client_list->list); + } + + cds_lfht_add(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + &client_list->notification_trigger_ht_node); + /* + * Client list ownership transferred to the + * notification_trigger_clients_ht. + */ + client_list = NULL; + + /* + * Add the trigger to list of triggers bound to the channels currently + * known. + */ + cds_lfht_for_each_entry(state->channels_ht, &iter, channel, + channels_ht_node) { + struct lttng_trigger_list_element *trigger_list_element; + struct lttng_channel_trigger_list *trigger_list; + + if (!trigger_applies_to_channel(trigger, channel)) { + continue; + } + + cds_lfht_lookup(state->channel_triggers_ht, + hash_channel_key(&channel->key), + match_channel_trigger_list, + &channel->key, + &iter); + node = cds_lfht_iter_get_node(&iter); + assert(node); + /* Free the list of triggers associated with this channel. */ + trigger_list = caa_container_of(node, + struct lttng_channel_trigger_list, + channel_triggers_ht_node); + + trigger_list_element = zmalloc(sizeof(*trigger_list_element)); + if (!trigger_list_element) { + ret = -1; + goto error_free_client_list; + } + CDS_INIT_LIST_HEAD(&trigger_list_element->node); + trigger_list_element->trigger = trigger; + cds_list_add(&trigger_list_element->node, &trigger_list->list); + /* A trigger can only apply to one channel. */ + break; + } + + *cmd_result = LTTNG_OK; +error_free_client_list: + if (client_list) { + cds_list_for_each_entry_safe(client_list_element, tmp, + &client_list->list, node) { + free(client_list_element); + } + free(client_list); + } +error_free_ht_element: + free(trigger_ht_element); +error: + if (free_trigger) { + lttng_trigger_destroy(trigger); + } + rcu_read_unlock(); + return ret; +} + +int handle_notification_thread_command_unregister_trigger( + struct notification_thread_state *state, + struct lttng_trigger *trigger, + enum lttng_error_code *_cmd_reply) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node, *triggers_ht_node; + struct lttng_channel_trigger_list *trigger_list; + struct notification_client_list *client_list; + struct notification_client_list_element *client_list_element, *tmp; + struct lttng_trigger_ht_element *trigger_ht_element = NULL; + struct lttng_condition *condition = lttng_trigger_get_condition( + trigger); + enum lttng_error_code cmd_reply; + + rcu_read_lock(); + + cds_lfht_lookup(state->triggers_ht, + lttng_condition_hash(condition), + match_condition, + condition, + &iter); + triggers_ht_node = cds_lfht_iter_get_node(&iter); + if (!triggers_ht_node) { + cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND; + goto end; + } else { + cmd_reply = LTTNG_OK; + } + + /* Remove trigger from channel_triggers_ht. */ + cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list, + channel_triggers_ht_node) { + struct lttng_trigger_list_element *trigger_element, *tmp; + + cds_list_for_each_entry_safe(trigger_element, tmp, + &trigger_list->list, node) { + struct lttng_condition *current_condition = + lttng_trigger_get_condition( + trigger_element->trigger); + + assert(current_condition); + if (!lttng_condition_is_equal(condition, + current_condition)) { + continue; + } + + DBG("[notification-thread] Removed trigger from channel_triggers_ht"); + cds_list_del(&trigger_element->node); + } + } + + /* + * Remove and release the client list from + * notification_trigger_clients_ht. + */ + cds_lfht_lookup(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + match_client_list, + trigger, + &iter); + node = cds_lfht_iter_get_node(&iter); + assert(node); + client_list = caa_container_of(node, struct notification_client_list, + notification_trigger_ht_node); + cds_list_for_each_entry_safe(client_list_element, tmp, + &client_list->list, node) { + free(client_list_element); + } + cds_lfht_del(state->notification_trigger_clients_ht, node); + free(client_list); + + /* Remove trigger from triggers_ht. */ + trigger_ht_element = caa_container_of(triggers_ht_node, + struct lttng_trigger_ht_element, node); + cds_lfht_del(state->triggers_ht, triggers_ht_node); + lttng_trigger_destroy(trigger_ht_element->trigger); + free(trigger_ht_element); +end: + rcu_read_unlock(); + if (_cmd_reply) { + *_cmd_reply = cmd_reply; + } + return 0; +} + +int handle_notification_thread_command( + struct notification_thread_handle *handle, + struct notification_thread_state *state) +{ + int ret; + uint64_t counter; + struct notification_thread_command *cmd; + + /* Read event_fd to put it back into a quiescent state. */ + ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter)); + if (ret == -1) { + goto error; + } + + pthread_mutex_lock(&handle->cmd_queue.lock); + cmd = cds_list_first_entry(&handle->cmd_queue.list, + struct notification_thread_command, cmd_list_node); + switch (cmd->type) { + case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER: + DBG("[notification-thread] Received register trigger command"); + ret = handle_notification_thread_command_register_trigger( + state, cmd->parameters.trigger, + &cmd->reply_code); + break; + case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER: + DBG("[notification-thread] Received unregister trigger command"); + ret = handle_notification_thread_command_unregister_trigger( + state, cmd->parameters.trigger, + &cmd->reply_code); + break; + case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL: + DBG("[notification-thread] Received add channel command"); + ret = handle_notification_thread_command_add_channel( + state, &cmd->parameters.add_channel, + &cmd->reply_code); + break; + case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL: + DBG("[notification-thread] Received remove channel command"); + ret = handle_notification_thread_command_remove_channel( + state, cmd->parameters.remove_channel.key, + cmd->parameters.remove_channel.domain, + &cmd->reply_code); + break; + default: + ERR("[notification-thread] Unknown internal command received"); + goto error_unlock; + } + + if (ret) { + goto error_unlock; + } + + cds_list_del(&cmd->cmd_list_node); + futex_nto1_wake(&cmd->reply_futex); + pthread_mutex_unlock(&handle->cmd_queue.lock); + return 0; +error_unlock: + /* Wake-up and return a fatal error to the calling thread. */ + futex_nto1_wake(&cmd->reply_futex); + pthread_mutex_unlock(&handle->cmd_queue.lock); + cmd->reply_code = LTTNG_ERR_FATAL; +error: + /* Indicate a fatal error to the caller. */ + return 1; +} + +static +unsigned long hash_client_socket(int socket) +{ + return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed); +} + +int handle_notification_thread_client_connect( + struct notification_thread_state *state) +{ + int ret; + struct notification_client *client; + + DBG("[notification-thread] Handling new notification channel client connection"); + + client = zmalloc(sizeof(*client)); + if (!client) { + /* Fatal error. */ + ret = -1; + goto error; + } + CDS_INIT_LIST_HEAD(&client->condition_list); + + ret = lttcomm_accept_unix_sock(state->notification_channel_socket); + if (ret < 0) { + ERR("[notification-thread] Failed to accept new notification channel client connection"); + ret = 0; + goto error; + } + + client->socket = ret; + + /* FIXME set client socket as non-blocking. */ + /* + ret = set_socket_non_blocking(client->socket); + if (ret) { + ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking"); + goto error; + } + */ + + /* FIXME handle creds. */ + ret = lttcomm_setsockopt_creds_unix_sock(client->socket); + if (ret < 0) { + ERR("[notification-thread] Failed to set socket options on new notification channel client socket"); + ret = 0; + goto error; + } + + /* FIXME protocol version handshake. */ + + ret = lttng_poll_add(&state->events, client->socket, + LPOLLIN | LPOLLERR | + LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { + ERR("[notification-thread] Failed to add notification channel client socket to poll set"); + ret = 0; + goto error; + } + DBG("[notification-thread] Added new notification channel client socket (%i) to poll set", + client->socket); + + /* Add to ht. */ + rcu_read_lock(); + cds_lfht_add(state->client_socket_ht, + hash_client_socket(client->socket), + &client->client_socket_ht_node); + rcu_read_unlock(); + + return ret; +error: + notification_client_destroy(client, state); + return ret; +} + +int handle_notification_thread_client_disconnect( + int client_socket, + struct notification_thread_state *state) +{ + int ret = 0; + struct notification_client *client; + + rcu_read_lock(); + DBG("[notification-thread] Closing client connection (socket fd = %i)", + client_socket); + client = get_client_from_socket(client_socket, state); + if (!client) { + /* Internal state corruption, fatal error. */ + ERR("[notification-thread] Unable to find client (socket fd = %i)", + client_socket); + ret = -1; + goto end; + } + + ret = lttng_poll_del(&state->events, client_socket); + if (ret) { + ERR("[notification-thread] Failed to remove client socket from poll set"); + } + cds_lfht_del(state->client_socket_ht, + &client->client_socket_ht_node); + notification_client_destroy(client, state); +end: + rcu_read_unlock(); + return ret; +} + +int handle_notification_thread_client_disconnect_all( + struct notification_thread_state *state) +{ + struct cds_lfht_iter iter; + struct notification_client *client; + bool error_encoutered = false; + + rcu_read_lock(); + DBG("[notification-thread] Closing all client connections"); + cds_lfht_for_each_entry(state->client_socket_ht, &iter, client, + client_socket_ht_node) { + int ret; + + ret = handle_notification_thread_client_disconnect( + client->socket, state); + if (ret) { + error_encoutered = true; + } + } + rcu_read_unlock(); + return error_encoutered ? 1 : 0; +} + +int handle_notification_thread_trigger_unregister_all( + struct notification_thread_state *state) +{ + bool error_occured = false; + struct cds_lfht_iter iter; + struct lttng_trigger_ht_element *trigger_ht_element; + + cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element, + node) { + int ret = handle_notification_thread_command_unregister_trigger( + state, trigger_ht_element->trigger, NULL); + if (ret) { + error_occured = true; + } + } + return error_occured ? -1 : 0; +} + +static +int send_client_reply(int socket, + enum lttng_notification_channel_status status) +{ + ssize_t ret; + struct lttng_notification_channel_command_reply reply = { + .status = (int8_t) status, + }; + struct lttng_notification_channel_message msg = { + .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY, + .size = sizeof(reply), + }; + char *buffer[sizeof(msg) + sizeof(reply)] = {}; + + memcpy(buffer, &msg, sizeof(msg)); + memcpy(buffer + sizeof(msg), &reply, sizeof(reply)); + DBG("[notification-thread] Send command reply (%i)", (int) status); + + ret = lttcomm_send_unix_sock(socket, buffer, + sizeof(msg) + sizeof(reply)); + if (ret < 0) { + ERR("[notification-thread] Failed to send command reply"); + goto error; + } + return 0; +error: + return -1; +} + +int handle_notification_thread_client(struct notification_thread_state *state, + int socket) +{ + int ret = 0; + size_t received = 0; + struct notification_client *client; + struct lttng_notification_channel_message msg; + struct lttng_dynamic_buffer buffer; + struct lttng_condition *condition = NULL; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + + lttng_dynamic_buffer_init(&buffer); + + client = get_client_from_socket(socket, state); + if (!client) { + /* Internal error, abort. */ + ret = 1; + goto error_no_reply; + } + + /* Receive message header. */ + do { + ssize_t recv_ret; + + recv_ret = lttcomm_recv_unix_sock(socket, + ((char *) &msg) + received, + sizeof(msg) - received); + if (recv_ret <= 0) { + ERR("[notification-thread] Failed to receive channel command from client (received %zu bytes)", received); + /* + * Protocol error, disconnect the client but don't + * signal an error. + */ + goto error_disconnect_client; + } + received += recv_ret; + } while (received < sizeof(msg)); + + ret = lttng_dynamic_buffer_set_size(&buffer, msg.size); + if (ret) { + goto error_disconnect_client; + } + + /* Receive message body. */ + received = 0; + do { + ssize_t recv_ret; + + recv_ret = lttcomm_recv_unix_sock(socket, + buffer.data + received, + msg.size - received); + if (recv_ret <= 0) { + ERR("[notification-thread] Failed to receive condition from client"); + goto error_disconnect_client; + } + received += recv_ret; + } while (received < msg.size); + + ret = lttng_condition_create_from_buffer(buffer.data, &condition); + if (ret < 0 || ret < msg.size) { + ERR("[notification-thread] Malformed condition received from client"); + goto error_disconnect_client; + } + + DBG("[notification-thread] Successfully received condition from notification channel client"); + + switch ((enum lttng_notification_channel_message_type) msg.type) { + case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE: + ret = notification_thread_client_subscribe(client, condition, + state, &status); + break; + case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE: + ret = notification_thread_client_unsubscribe(client, condition, + state, &status); + break; + default: + ERR("[notification-thread] Unknown command type received from notification channel client"); + goto error_disconnect_client; + } + + if (send_client_reply(socket, status)) { + ERR("[notification-thread] Failed to send reply to notification channel client"); + goto error_disconnect_client; + } + + lttng_dynamic_buffer_reset(&buffer); + ret = 0; + return ret; + +error_disconnect_client: + ret = handle_notification_thread_client_disconnect(socket, state); +error_no_reply: + lttng_dynamic_buffer_reset(&buffer); + lttng_condition_destroy(condition); + return ret; +} + +static +bool evaluate_buffer_usage_condition(struct lttng_condition *condition, + struct channel_state_sample *sample, uint64_t buffer_capacity) +{ + bool result = false; + uint64_t threshold; + enum lttng_condition_type condition_type; + struct lttng_condition_buffer_usage *use_condition = container_of( + condition, struct lttng_condition_buffer_usage, + parent); + + if (!sample) { + goto end; + } + + if (use_condition->threshold_bytes.set) { + threshold = use_condition->threshold_bytes.value; + } else { + /* Threshold was expressed as a ratio. */ + threshold = (uint64_t) (use_condition->threshold_ratio.value * + (double) buffer_capacity); + } + + condition_type = lttng_condition_get_type(condition); + if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) { + /* + * The low condition should only be triggered once _all_ of the + * streams in a channel have gone below the "low" threshold. + */ + if (sample->highest_usage <= threshold) { + result = true; + } + } else { + /* + * For high buffer usage scenarios, we want to trigger whenever + * _any_ of the streams has reached the "high" threshold. + */ + if (sample->highest_usage >= threshold) { + result = true; + } + } +end: + return result; +} + +static +int evaluate_condition(struct lttng_condition *condition, + struct lttng_evaluation **evaluation, + struct notification_thread_state *state, + struct channel_state_sample *previous_sample, + struct channel_state_sample *latest_sample, + uint64_t buffer_capacity) +{ + int ret = 0; + enum lttng_condition_type condition_type; + bool previous_sample_result; + bool latest_sample_result; + + condition_type = lttng_condition_get_type(condition); + /* No other condition type supported for the moment. */ + assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW || + condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH); + + previous_sample_result = evaluate_buffer_usage_condition(condition, + previous_sample, buffer_capacity); + latest_sample_result = evaluate_buffer_usage_condition(condition, + latest_sample, buffer_capacity); + + if (!latest_sample_result || + (previous_sample_result == latest_sample_result)) { + /* + * Only trigger on a condition evaluation transition. + * NOTE: This edge-triggered logic may not be appropriate for + * future condition types. + */ + goto end; + } + + if (evaluation && latest_sample_result) { + *evaluation = lttng_evaluation_buffer_usage_create( + condition_type, + latest_sample->highest_usage, + buffer_capacity); + if (!*evaluation) { + ret = -1; + goto end; + } + } +end: + return ret; +} + +static +int send_evaluation_to_clients(struct lttng_trigger *trigger, + struct lttng_evaluation *evaluation, + struct notification_client_list* client_list) +{ + int ret = 0; + struct lttng_dynamic_buffer msg_buffer; + struct notification_client_list_element *client_list_element, *tmp; + struct lttng_notification *notification; + struct lttng_condition *condition; + ssize_t expected_notification_size, notification_size; + struct lttng_notification_channel_message msg; + + lttng_dynamic_buffer_init(&msg_buffer); + + condition = lttng_trigger_get_condition(trigger); + assert(condition); + + notification = lttng_notification_create(condition, evaluation); + if (!notification) { + ret = -1; + goto end; + } + + expected_notification_size = lttng_notification_serialize(notification, + NULL); + if (expected_notification_size < 0) { + ERR("[notification-thread] Failed to get size of serialized notification"); + ret = -1; + goto end; + } + + msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION; + msg.size = (uint32_t) expected_notification_size; + ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg)); + if (ret) { + goto end; + } + + ret = lttng_dynamic_buffer_set_size(&msg_buffer, + msg_buffer.size + expected_notification_size); + if (ret) { + goto end; + } + + notification_size = lttng_notification_serialize(notification, + msg_buffer.data + sizeof(msg)); + if (notification_size != expected_notification_size) { + ERR("[notification-thread] Failed to serialize notification"); + ret = -1; + goto end; + } + + cds_list_for_each_entry_safe(client_list_element, tmp, + &client_list->list, node) { + ret = lttcomm_send_unix_sock( + client_list_element->client->socket, + msg_buffer.data, msg_buffer.size); + if (ret < 0) { + ERR("[notification-thread] Failed to send notification to client"); + } + } + ret = 0; +end: + lttng_notification_destroy(notification); + lttng_dynamic_buffer_reset(&msg_buffer); + return ret; +} + +int handle_notification_thread_channel_sample( + struct notification_thread_state *state, int pipe, + enum lttng_domain_type domain) +{ + int ret = 0; + struct lttcomm_consumer_channel_monitor_msg sample_msg; + struct channel_state_sample previous_sample, latest_sample; + struct channel_info *channel_info; + struct cds_lfht_node *node; + struct cds_lfht_iter iter; + struct lttng_channel_trigger_list *trigger_list; + struct lttng_trigger_list_element *trigger_list_element; + bool previous_sample_available = false; + + /* + * The monitoring pipe only holds messages smaller than PIPE_BUF, + * ensuring that read/write of sampling messages are atomic. + */ + do { + ret = read(pipe, &sample_msg, sizeof(sample_msg)); + } while (ret == -1 && errno == EINTR); + if (ret != sizeof(sample_msg)) { + ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)", + pipe); + ret = -1; + goto end; + } + + ret = 0; + latest_sample.key.key = sample_msg.key; + latest_sample.key.domain = domain; + latest_sample.highest_usage = sample_msg.highest; + latest_sample.lowest_usage = sample_msg.lowest; + + DBG("[notification-thread] Handling channel sample (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")", + latest_sample.highest_usage, + latest_sample.lowest_usage); + + rcu_read_lock(); + + /* Retrieve the channel's informations */ + cds_lfht_lookup(state->channels_ht, + hash_channel_key(&latest_sample.key), + match_channel_info, + &latest_sample.key, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + /* + * Not an error since the consumer can push a sample to the pipe + * and the rest of the session daemon could notify us of the + * channel's destruction before we get a chance to process that + * sample. + */ + DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain", + latest_sample.key.key, + domain == LTTNG_DOMAIN_KERNEL ? "kernel" : + "user space"); + goto end_unlock; + } + channel_info = caa_container_of(node, struct channel_info, + channels_ht_node); + + /* Retrieve the channel's last sample, if it exists, and update it. */ + cds_lfht_lookup(state->channel_state_ht, + hash_channel_key(&latest_sample.key), + match_channel_state_sample, + &latest_sample.key, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (node) { + struct channel_state_sample *stored_sample; + + /* Update the sample stored. */ + stored_sample = caa_container_of(node, + struct channel_state_sample, + channel_state_ht_node); + memcpy(&previous_sample, stored_sample, + sizeof(previous_sample)); + stored_sample->highest_usage = latest_sample.highest_usage; + stored_sample->lowest_usage = latest_sample.lowest_usage; + previous_sample_available = true; + } else { + /* + * This is the channel's first sample, allocate space for and + * store the new sample. + */ + struct channel_state_sample *stored_sample; + + stored_sample = zmalloc(sizeof(*stored_sample)); + if (!stored_sample) { + ret = -1; + goto end_unlock; + } + + memcpy(stored_sample, &latest_sample, sizeof(*stored_sample)); + cds_lfht_node_init(&stored_sample->channel_state_ht_node); + cds_lfht_add(state->channel_state_ht, + hash_channel_key(&stored_sample->key), + &stored_sample->channel_state_ht_node); + } + + /* Find triggers associated with this channel. */ + cds_lfht_lookup(state->channel_triggers_ht, + hash_channel_key(&latest_sample.key), + match_channel_trigger_list, + &latest_sample.key, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end_unlock; + } + + trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, + channel_triggers_ht_node); + cds_list_for_each_entry(trigger_list_element, &trigger_list->list, + node) { + struct lttng_condition *condition; + struct lttng_action *action; + struct lttng_trigger *trigger; + struct notification_client_list *client_list; + struct lttng_evaluation *evaluation = NULL; + + trigger = trigger_list_element->trigger; + condition = lttng_trigger_get_condition(trigger); + assert(condition); + action = lttng_trigger_get_action(trigger); + + /* Notify actions are the only type currently supported. */ + assert(lttng_action_get_type(action) == + LTTNG_ACTION_TYPE_NOTIFY); + + /* + * Check if any client is subscribed to the result of this + * evaluation. + */ + cds_lfht_lookup(state->notification_trigger_clients_ht, + lttng_condition_hash(condition), + match_client_list, + trigger, + &iter); + node = cds_lfht_iter_get_node(&iter); + assert(node); + + client_list = caa_container_of(node, + struct notification_client_list, + notification_trigger_ht_node); + if (cds_list_empty(&client_list->list)) { + /* + * No clients interested in the evaluation's result, + * skip it. + */ + continue; + } + + ret = evaluate_condition(condition, &evaluation, state, + previous_sample_available ? &previous_sample : NULL, + &latest_sample, channel_info->capacity); + if (ret) { + goto end_unlock; + } + + if (!evaluation) { + continue; + } + + /* Dispatch evaluation result to all clients. */ + ret = send_evaluation_to_clients(trigger_list_element->trigger, + evaluation, client_list); + if (ret) { + goto end_unlock; + } + } +end_unlock: + rcu_read_unlock(); +end: + return ret; +} diff --git a/src/bin/lttng-sessiond/notification-thread-events.h b/src/bin/lttng-sessiond/notification-thread-events.h new file mode 100644 index 000000000..941154af0 --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread-events.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef NOTIFICATION_THREAD_EVENTS_H +#define NOTIFICATION_THREAD_EVENTS_H + +#include +#include "notification-thread.h" + +/** + * Event handling function shall only return an error if + * the thread should be stopped. + */ +int handle_notification_thread_command( + struct notification_thread_handle *handle, + struct notification_thread_state *state); + +int handle_notification_thread_client_connect( + struct notification_thread_state *state); + +int handle_notification_thread_client_disconnect( + int client_fd, + struct notification_thread_state *state); + +int handle_notification_thread_client_disconnect_all( + struct notification_thread_state *state); + +int handle_notification_thread_trigger_unregister_all( + struct notification_thread_state *state); + +int handle_notification_thread_client(struct notification_thread_state *state, + int socket); + +int handle_notification_thread_channel_sample( + struct notification_thread_state *state, int pipe, + enum lttng_domain_type domain); + +#endif /* NOTIFICATION_THREAD_EVENTS_H */ diff --git a/src/bin/lttng-sessiond/notification-thread.c b/src/bin/lttng-sessiond/notification-thread.c new file mode 100644 index 000000000..fcf7260ac --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread.c @@ -0,0 +1,682 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _LGPL_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "notification-thread.h" +#include "notification-thread-events.h" +#include "notification-thread-commands.h" +#include "lttng-sessiond.h" +#include "health-sessiond.h" + +#include +#include +#include + +/** + * This thread maintains an internal state associating clients and triggers. + * + * In order to speed-up and simplify queries, hash tables providing the + * following associations are maintained: + * + * - client_socket_ht: associate a client's socket (fd) to its "struct client" + * This hash table owns the "struct client" which must thus be + * disposed-of on removal from the hash table. + * + * - channel_triggers_ht: + * associates a channel key to a list of + * struct lttng_trigger_list_nodes. The triggers in this list are + * those that have conditions that apply to this channel. + * This hash table owns the list, but not the triggers themselves. + * + * - channel_state_ht: + * associates a pair (channel key, channel domain) to its last + * sampled state received from the consumer daemon + * (struct channel_state). + * This previous sample is kept to implement edge-triggered + * conditions as we need to detect the state transitions. + * This hash table owns the channel state. + * + * - notification_trigger_clients_ht: + * associates notification-emitting triggers to clients + * (struct notification_client_ht_node) subscribed to those + * conditions. + * The condition's hash and match functions are used directly since + * all triggers in this hash table have the "notify" action. + * This hash table holds no ownership. + * + * - channels_ht: + * associates a channel_key to a struct channel_info. The hash table + * holds the ownership of the struct channel_info. + * + * - triggers_ht: + * associated a condition to a struct lttng_trigger_ht_element. + * The hash table holds the ownership of the + * lttng_trigger_ht_elements along with the triggers themselves. + * + * The thread reacts to the following internal events: + * 1) creation of a tracing channel, + * 2) destruction of a tracing channel, + * 3) registration of a trigger, + * 4) unregistration of a trigger, + * 5) reception of a channel monitor sample from the consumer daemon. + * + * Events specific to notification-emitting triggers: + * 6) connection of a notification client, + * 7) disconnection of a notification client, + * 8) subscription of a client to a conditions' notifications, + * 9) unsubscription of a client from a conditions' notifications, + * + * + * 1) Creation of a tracing channel + * - notification_trigger_clients_ht is traversed to identify + * triggers which apply to this new channel, + * - triggers identified are added to the channel_triggers_ht. + * - add channel to channels_ht + * + * 2) Destruction of a tracing channel + * - remove entry from channel_triggers_ht, releasing the list wrapper and + * elements, + * - remove entry from the channel_state_ht. + * - remove channel from channels_ht + * + * 3) Registration of a trigger + * - if the trigger's action is of type "notify", + * - traverse the list of conditions of every client to build a list of + * clients which have to be notified when this trigger's condition is met, + * - add list of clients (even if it is empty) to the + * notification_trigger_clients_ht, + * - add trigger to channel_triggers_ht (if applicable), + * - add trigger to triggers_ht + * + * 4) Unregistration of a trigger + * - if the trigger's action is of type "notify", + * - remove the trigger from the notification_trigger_clients_ht, + * - remove trigger from channel_triggers_ht (if applicable), + * - remove trigger from triggers_ht + * + * 5) Reception of a channel monitor sample from the consumer daemon + * - evaluate the conditions associated with the triggers found in + * the channel_triggers_ht, + * - if a condition evaluates to "true" and the condition is of type + * "notify", query the notification_trigger_clients_ht and send + * a notification to the clients. + * + * 6) Connection of a client + * - add client socket to the client_socket_ht. + * + * 7) Disconnection of a client + * - remove client socket from the client_socket_ht, + * - traverse all conditions to which the client is subscribed and remove + * the client from the notification_trigger_clients_ht. + * + * 8) Subscription of a client to a condition's notifications + * - Add the condition to the client's list of subscribed conditions, + * - Look-up notification_trigger_clients_ht and add the client to + * list of clients. + * + * 9) Unsubscription of a client to a condition's notifications + * - Remove the condition from the client's list of subscribed conditions, + * - Look-up notification_trigger_clients_ht and remove the client + * from the list of clients. + */ + +/* + * Destroy the thread data previously created by the init function. + */ +void notification_thread_handle_destroy( + struct notification_thread_handle *handle) +{ + int ret; + struct notification_thread_command *cmd, *tmp; + + if (!handle) { + goto end; + } + + if (handle->cmd_queue.event_fd < 0) { + goto end; + } + ret = close(handle->cmd_queue.event_fd); + if (ret < 0) { + PERROR("close notification command queue event_fd"); + } + + pthread_mutex_lock(&handle->cmd_queue.lock); + /* Purge queue of in-flight commands and mark them as cancelled. */ + cds_list_for_each_entry_safe(cmd, tmp, &handle->cmd_queue.list, + cmd_list_node) { + cds_list_del(&cmd->cmd_list_node); + cmd->reply_code = LTTNG_ERR_COMMAND_CANCELLED; + futex_nto1_wake(&cmd->reply_futex); + } + pthread_mutex_unlock(&handle->cmd_queue.lock); + pthread_mutex_destroy(&handle->cmd_queue.lock); + + if (handle->channel_monitoring_pipes.ust32_consumer >= 0) { + ret = close(handle->channel_monitoring_pipes.ust32_consumer); + if (ret) { + PERROR("close 32-bit consumer channel monitoring pipe"); + } + } + if (handle->channel_monitoring_pipes.ust64_consumer >= 0) { + ret = close(handle->channel_monitoring_pipes.ust64_consumer); + if (ret) { + PERROR("close 64-bit consumer channel monitoring pipe"); + } + } + if (handle->channel_monitoring_pipes.kernel_consumer >= 0) { + ret = close(handle->channel_monitoring_pipes.kernel_consumer); + if (ret) { + PERROR("close kernel consumer channel monitoring pipe"); + } + } +end: + free(handle); +} + +struct notification_thread_handle *notification_thread_handle_create( + struct lttng_pipe *ust32_channel_monitor_pipe, + struct lttng_pipe *ust64_channel_monitor_pipe, + struct lttng_pipe *kernel_channel_monitor_pipe) +{ + int ret; + struct notification_thread_handle *handle; + + handle = zmalloc(sizeof(*handle)); + if (!handle) { + goto end; + } + + /* FIXME Replace eventfd by a pipe to support older kernels. */ + handle->cmd_queue.event_fd = eventfd(0, EFD_CLOEXEC); + if (handle->cmd_queue.event_fd < 0) { + PERROR("eventfd notification command queue"); + goto error; + } + CDS_INIT_LIST_HEAD(&handle->cmd_queue.list); + ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL); + if (ret) { + goto error; + } + + if (ust32_channel_monitor_pipe) { + handle->channel_monitoring_pipes.ust32_consumer = + lttng_pipe_release_readfd( + ust32_channel_monitor_pipe); + if (handle->channel_monitoring_pipes.ust32_consumer < 0) { + goto error; + } + } else { + handle->channel_monitoring_pipes.ust32_consumer = -1; + } + if (ust64_channel_monitor_pipe) { + handle->channel_monitoring_pipes.ust64_consumer = + lttng_pipe_release_readfd( + ust64_channel_monitor_pipe); + if (handle->channel_monitoring_pipes.ust64_consumer < 0) { + goto error; + } + } else { + handle->channel_monitoring_pipes.ust64_consumer = -1; + } + if (kernel_channel_monitor_pipe) { + handle->channel_monitoring_pipes.kernel_consumer = + lttng_pipe_release_readfd( + kernel_channel_monitor_pipe); + if (handle->channel_monitoring_pipes.kernel_consumer < 0) { + goto error; + } + } else { + handle->channel_monitoring_pipes.kernel_consumer = -1; + } +end: + return handle; +error: + notification_thread_handle_destroy(handle); + return NULL; +} + +static +char *get_notification_channel_sock_path(void) +{ + int ret; + bool is_root = !getuid(); + char *sock_path; + + sock_path = zmalloc(LTTNG_PATH_MAX); + if (!sock_path) { + goto error; + } + + if (is_root) { + ret = snprintf(sock_path, LTTNG_PATH_MAX, + DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK); + if (ret < 0) { + goto error; + } + } else { + char *home_path = utils_get_home_dir(); + + if (!home_path) { + ERR("Can't get HOME directory for socket creation"); + goto error; + } + + ret = snprintf(sock_path, LTTNG_PATH_MAX, + DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK, + home_path); + if (ret < 0) { + goto error; + } + } + + return sock_path; +error: + free(sock_path); + return NULL; +} + +static +void notification_channel_socket_destroy(int fd) +{ + int ret; + char *sock_path = get_notification_channel_sock_path(); + + DBG("[notification-thread] Destroying notification channel socket"); + + if (sock_path) { + ret = unlink(sock_path); + free(sock_path); + if (ret < 0) { + PERROR("unlink notification channel socket"); + } + } + + ret = close(fd); + if (ret) { + PERROR("close notification channel socket"); + } +} + +static +int notification_channel_socket_create(void) +{ + int fd = -1, ret; + char *sock_path = get_notification_channel_sock_path(); + + DBG("[notification-thread] Creating notification channel UNIX socket at %s", + sock_path); + + ret = lttcomm_create_unix_sock(sock_path); + if (ret < 0) { + ERR("[notification-thread] Failed to create notification socket"); + goto error; + } + fd = ret; + + ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + if (ret < 0) { + ERR("Set file permissions failed: %s", sock_path); + PERROR("chmod notification channel socket"); + goto error; + } + + DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)", + fd); + free(sock_path); + return fd; +error: + if (fd >= 0 && close(fd) < 0) { + PERROR("close notification channel socket"); + } + free(sock_path); + return ret; +} + +static +int init_poll_set(struct lttng_poll_event *poll_set, + struct notification_thread_handle *handle, + int notification_channel_socket) +{ + int ret; + + /* + * Create pollset with size 6: + * - quit pipe, + * - notification channel socket (listen for new connections), + * - command queue event fd (internal sessiond commands), + * - consumerd (32-bit user space) channel monitor pipe, + * - consumerd (64-but user space) channel monitor pipe, + * - consumerd (kernel) channel monitor pipe. + */ + ret = sessiond_set_thread_pollset(poll_set, 6); + if (ret < 0) { + goto end; + } + + ret = lttng_poll_add(poll_set, notification_channel_socket, + LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { + ERR("[notification-thread] Failed to add notification channel socket to pollset"); + goto error; + } + ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add notification command queue event fd to pollset"); + goto error; + } + ret = lttng_poll_add(poll_set, + handle->channel_monitoring_pipes.ust32_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset"); + goto error; + } + ret = lttng_poll_add(poll_set, + handle->channel_monitoring_pipes.ust64_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset"); + goto error; + } + if (handle->channel_monitoring_pipes.kernel_consumer < 0) { + goto end; + } + ret = lttng_poll_add(poll_set, + handle->channel_monitoring_pipes.kernel_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset"); + goto error; + } +end: + return ret; +error: + lttng_poll_clean(poll_set); + return ret; +} + +static +void fini_thread_state(struct notification_thread_state *state) +{ + int ret; + + if (state->client_socket_ht) { + ret = handle_notification_thread_client_disconnect_all(state); + assert(!ret); + ret = cds_lfht_destroy(state->client_socket_ht, NULL); + assert(!ret); + } + if (state->triggers_ht) { + ret = handle_notification_thread_trigger_unregister_all(state); + assert(!ret); + ret = cds_lfht_destroy(state->triggers_ht, NULL); + assert(!ret); + } + if (state->channel_triggers_ht) { + ret = cds_lfht_destroy(state->channel_triggers_ht, NULL); + assert(!ret); + } + if (state->channel_state_ht) { + ret = cds_lfht_destroy(state->channel_state_ht, NULL); + assert(!ret); + } + if (state->notification_trigger_clients_ht) { + ret = cds_lfht_destroy(state->notification_trigger_clients_ht, + NULL); + assert(!ret); + } + if (state->channels_ht) { + ret = cds_lfht_destroy(state->channels_ht, + NULL); + assert(!ret); + } + + if (state->notification_channel_socket >= 0) { + notification_channel_socket_destroy( + state->notification_channel_socket); + } + lttng_poll_clean(&state->events); +} + +static +int init_thread_state(struct notification_thread_handle *handle, + struct notification_thread_state *state) +{ + int ret; + + memset(state, 0, sizeof(*state)); + state->notification_channel_socket = -1; + lttng_poll_init(&state->events); + + ret = notification_channel_socket_create(); + if (ret < 0) { + goto end; + } + state->notification_channel_socket = ret; + + ret = init_poll_set(&state->events, handle, + state->notification_channel_socket); + if (ret) { + goto end; + } + + DBG("[notification-thread] Listening on notification channel socket"); + ret = lttcomm_listen_unix_sock(state->notification_channel_socket); + if (ret < 0) { + ERR("[notification-thread] Listen failed on notification channel socket"); + goto error; + } + + state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->client_socket_ht) { + goto error; + } + + state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->channel_triggers_ht) { + goto error; + } + + state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->channel_state_ht) { + goto error; + } + + state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->notification_trigger_clients_ht) { + goto error; + } + + state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->channels_ht) { + goto error; + } + + state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->triggers_ht) { + goto error; + } +end: + return 0; +error: + fini_thread_state(state); + return -1; +} + +/* + * This thread services notification channel clients and received notifications + * from various lttng-sessiond components over a command queue. + */ +void *thread_notification(void *data) +{ + int ret; + struct notification_thread_handle *handle = data; + struct notification_thread_state state; + + DBG("[notification-thread] Started notification thread"); + + if (!handle) { + ERR("[notification-thread] Invalid thread context provided"); + goto end; + } + + rcu_register_thread(); + rcu_thread_online(); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION); + health_code_update(); + + ret = init_thread_state(handle, &state); + if (ret) { + goto end; + } + + /* Ready to handle client connections. */ + sessiond_notify_ready(); + + while (true) { + int fd_count, i; + + health_poll_entry(); + DBG("[notification-thread] Entering poll wait"); + ret = lttng_poll_wait(&state.events, -1); + DBG("[notification-thread] Poll wait returned (%i)", ret); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + continue; + } + ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret); + goto error; + } + + fd_count = ret; + for (i = 0; i < fd_count; i++) { + int fd = LTTNG_POLL_GETFD(&state.events, i); + uint32_t revents = LTTNG_POLL_GETEV(&state.events, i); + + DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents); + + /* Thread quit pipe has been closed. Killing thread. */ + if (sessiond_check_thread_quit_pipe(fd, revents)) { + DBG("[notification-thread] Quit pipe signaled, exiting."); + goto exit; + } + + if (fd == state.notification_channel_socket) { + if (revents & LPOLLIN) { + ret = handle_notification_thread_client_connect( + &state); + if (ret < 0) { + goto error; + } + } else if (revents & + (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("[notification-thread] Notification socket poll error"); + goto error; + } else { + ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd); + goto error; + } + } else if (fd == handle->cmd_queue.event_fd) { + ret = handle_notification_thread_command(handle, + &state); + if (ret) { + goto error; + } + } else if (fd == handle->channel_monitoring_pipes.ust32_consumer) { + ret = handle_notification_thread_channel_sample( + &state, fd, LTTNG_DOMAIN_UST); + if (ret) { + goto error; + } + } else if (fd == handle->channel_monitoring_pipes.ust64_consumer) { + ret = handle_notification_thread_channel_sample( + &state, fd, LTTNG_DOMAIN_UST); + if (ret) { + goto error; + } + } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) { + ret = handle_notification_thread_channel_sample( + &state, fd, LTTNG_DOMAIN_KERNEL); + if (ret) { + goto error; + } + } else { + /* Activity on a client's socket. */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* + * It doesn't matter if a command was + * pending on the client socket at this + * point since it now has now way to + * receive the notifications to which + * it was subscribing or unsubscribing. + */ + ret = handle_notification_thread_client_disconnect( + fd, &state); + if (ret) { + goto error; + } + } else if (revents & LPOLLIN) { + ret = handle_notification_thread_client( + &state, fd); + if (ret) { + goto error; + } + } else { + DBG("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd); + } + } + } + } +exit: +error: + fini_thread_state(&state); + health_unregister(health_sessiond); + rcu_thread_offline(); + rcu_unregister_thread(); +end: + return NULL; +} diff --git a/src/bin/lttng-sessiond/notification-thread.h b/src/bin/lttng-sessiond/notification-thread.h new file mode 100644 index 000000000..c401b410b --- /dev/null +++ b/src/bin/lttng-sessiond/notification-thread.h @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef NOTIFICATION_THREAD_H +#define NOTIFICATION_THREAD_H + +#include +#include +#include +#include +#include +#include +#include +#include + +struct notification_thread_handle { + /* + * Queue of struct notification command. + * event_fd must be WRITE(2) to signal that a new command + * has been enqueued. + */ + struct { + int event_fd; + struct cds_list_head list; + pthread_mutex_t lock; + } cmd_queue; + /* + * Read side of pipes used to receive channel status info collected + * by the various consumer daemons. + */ + struct { + int ust32_consumer; + int ust64_consumer; + int kernel_consumer; + } channel_monitoring_pipes; +}; + +struct notification_thread_state { + int notification_channel_socket; + struct lttng_poll_event events; + struct cds_lfht *client_socket_ht; + struct cds_lfht *channel_triggers_ht; + struct cds_lfht *channel_state_ht; + struct cds_lfht *notification_trigger_clients_ht; + struct cds_lfht *channels_ht; + struct cds_lfht *triggers_ht; +}; + +/* notification_thread_data takes ownership of the channel monitor pipes. */ +struct notification_thread_handle *notification_thread_handle_create( + struct lttng_pipe *ust32_channel_monitor_pipe, + struct lttng_pipe *ust64_channel_monitor_pipe, + struct lttng_pipe *kernel_channel_monitor_pipe); +void notification_thread_handle_destroy( + struct notification_thread_handle *handle); + +void *thread_notification(void *data); + +#endif /* NOTIFICATION_THREAD_H */ diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 5a41c3800..eb4a76a2f 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -41,6 +41,8 @@ #include "ust-ctl.h" #include "utils.h" #include "session.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" static int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess); @@ -482,7 +484,8 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan, /* Wipe and free registry from session registry. */ registry = get_session_registry(ua_chan->session); if (registry) { - ust_registry_channel_del_free(registry, ua_chan->key); + ust_registry_channel_del_free(registry, ua_chan->key, + true); } save_per_pid_lost_discarded_counters(ua_chan); } @@ -2839,6 +2842,7 @@ static int create_channel_per_uid(struct ust_app *app, int ret; struct buffer_reg_uid *reg_uid; struct buffer_reg_channel *reg_chan; + bool created = false; assert(app); assert(usess); @@ -2882,7 +2886,7 @@ static int create_channel_per_uid(struct ust_app *app, * it's not visible anymore in the session registry. */ ust_registry_channel_del_free(reg_uid->registry->reg.ust, - ua_chan->tracing_channel_id); + ua_chan->tracing_channel_id, false); buffer_reg_channel_remove(reg_uid->registry, reg_chan); buffer_reg_channel_destroy(reg_chan, LTTNG_DOMAIN_UST); goto error; @@ -2898,7 +2902,7 @@ static int create_channel_per_uid(struct ust_app *app, ua_chan->name); goto error; } - + created = true; } /* Send buffers to the application. */ @@ -2910,6 +2914,39 @@ static int create_channel_per_uid(struct ust_app *app, goto error; } + if (created) { + enum lttng_error_code cmd_ret; + struct ltt_session *session; + uint64_t chan_reg_key; + struct ust_registry_channel *chan_reg; + + chan_reg_key = ua_chan->tracing_channel_id; + pthread_mutex_lock(®_uid->registry->reg.ust->lock); + rcu_read_lock(); + chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust, chan_reg_key); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + rcu_read_unlock(); + chan_reg = NULL; + pthread_mutex_unlock(®_uid->registry->reg.ust->lock); + + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + cmd_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + if (cmd_ret != LTTNG_OK) { + ret = - (int) cmd_ret; + ERR("Failed to add channel to notification thread"); + goto error; + } + } + error: return ret; } @@ -2925,6 +2962,8 @@ static int create_channel_per_pid(struct ust_app *app, { int ret; struct ust_registry_session *registry; + enum lttng_error_code cmd_ret; + struct ltt_session *session; assert(app); assert(usess); @@ -2963,6 +3002,32 @@ static int create_channel_per_pid(struct ust_app *app, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + uint64_t chan_reg_key; + struct ust_registry_channel *chan_reg; + + chan_reg_key = ua_chan->key; + pthread_mutex_lock(®istry->lock); + chan_reg = ust_registry_channel_find(registry, chan_reg_key); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + pthread_mutex_unlock(®istry->lock); + + cmd_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + if (cmd_ret != LTTNG_OK) { + ret = - (int) cmd_ret; + ERR("Failed to add channel to notification thread"); + goto error; + } + error: rcu_read_unlock(); return ret; @@ -3075,7 +3140,6 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess, /* Only add the channel if successful on the tracer side. */ lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node); - end: if (ua_chanp) { *ua_chanp = ua_chan; diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 3bb54f039..d60ddb9ed 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -32,6 +32,8 @@ #include "ust-consumer.h" #include "buffer-registry.h" #include "session.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" /* * Return allocated full pathname of the session using the consumer trace path @@ -174,6 +176,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, ua_chan->attr.switch_timer_interval, ua_chan->attr.read_timer_interval, ua_sess->live_timer_interval, + 1000000, /* FIXME Use value provided by the client. */ output, (int) ua_chan->attr.type, ua_sess->tracing_id, @@ -223,6 +226,8 @@ error: /* * Ask consumer to create a channel for a given session. * + * Session list and rcu read side locks must be held by the caller. + * * Returns 0 on success else a negative value. */ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, @@ -230,6 +235,7 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, struct consumer_socket *socket, struct ust_registry_session *registry) { int ret; + struct ltt_session *session; assert(ua_sess); assert(ua_chan); @@ -243,10 +249,14 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + pthread_mutex_lock(socket->lock); ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry); pthread_mutex_unlock(socket->lock); if (ret < 0) { + ERR("ask_channel_creation consumer command failed"); goto error; } diff --git a/src/bin/lttng-sessiond/ust-registry.c b/src/bin/lttng-sessiond/ust-registry.c index 3c3aa91f0..88d663170 100644 --- a/src/bin/lttng-sessiond/ust-registry.c +++ b/src/bin/lttng-sessiond/ust-registry.c @@ -26,6 +26,8 @@ #include "ust-registry.h" #include "ust-app.h" #include "utils.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" /* * Hash table match function for event in the registry. @@ -695,13 +697,23 @@ void destroy_channel_rcu(struct rcu_head *head) * free the registry pointer since it might not have been allocated before so * it's the caller responsability. */ -static void destroy_channel(struct ust_registry_channel *chan) +static void destroy_channel(struct ust_registry_channel *chan, bool notif) { struct lttng_ht_iter iter; struct ust_registry_event *event; + enum lttng_error_code cmd_ret; assert(chan); + if (notif) { + cmd_ret = notification_thread_command_remove_channel( + notification_thread_handle, chan->consumer_key, + LTTNG_DOMAIN_UST); + if (cmd_ret != LTTNG_OK) { + ERR("Failed to remove channel from notification thread"); + } + } + rcu_read_lock(); /* Destroy all event associated with this registry. */ cds_lfht_for_each_entry(chan->ht->ht, &iter.iter, event, node.node) { @@ -759,7 +771,7 @@ int ust_registry_channel_add(struct ust_registry_session *session, return 0; error: - destroy_channel(chan); + destroy_channel(chan, false); error_alloc: return ret; } @@ -798,7 +810,7 @@ end: * Remove channel using key from registry and free memory. */ void ust_registry_channel_del_free(struct ust_registry_session *session, - uint64_t key) + uint64_t key, bool notif) { struct lttng_ht_iter iter; struct ust_registry_channel *chan; @@ -817,7 +829,7 @@ void ust_registry_channel_del_free(struct ust_registry_session *session, ret = lttng_ht_del(session->channels, &iter); assert(!ret); rcu_read_unlock(); - destroy_channel(chan); + destroy_channel(chan, notif); end: return; @@ -972,7 +984,7 @@ void ust_registry_session_destroy(struct ust_registry_session *reg) /* Delete the node from the ht and free it. */ ret = lttng_ht_del(reg->channels, &iter); assert(!ret); - destroy_channel(chan); + destroy_channel(chan, true); } rcu_read_unlock(); ht_cleanup_push(reg->channels); diff --git a/src/bin/lttng-sessiond/ust-registry.h b/src/bin/lttng-sessiond/ust-registry.h index 2697cf2ec..47b21dfe0 100644 --- a/src/bin/lttng-sessiond/ust-registry.h +++ b/src/bin/lttng-sessiond/ust-registry.h @@ -109,6 +109,7 @@ struct ust_registry_session { struct ust_registry_channel { uint64_t key; + uint64_t consumer_key; /* Id set when replying to a register channel. */ uint32_t chan_id; enum ustctl_channel_header header_type; @@ -248,7 +249,7 @@ struct ust_registry_channel *ust_registry_channel_find( int ust_registry_channel_add(struct ust_registry_session *session, uint64_t key); void ust_registry_channel_del_free(struct ust_registry_session *session, - uint64_t key); + uint64_t key, bool notif); int ust_registry_session_init(struct ust_registry_session **sessionp, struct ust_app *app, diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 8619b5e25..f6df9a61a 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -73,7 +73,10 @@ libcommon_la_SOURCES = error.h error.c utils.c utils.h runas.c runas.h \ mi-lttng.h mi-lttng.c \ daemonize.c daemonize.h \ unix.c unix.h \ - filter.c filter.h context.c context.h + filter.c filter.h context.c context.h \ + action.c notify.c condition.c buffer-usage.c \ + evaluation.c notification.c trigger.c endpoint.c \ + dynamic-buffer.h dynamic-buffer.c libcommon_la_LIBADD = \ $(top_builddir)/src/common/config/libconfig.la diff --git a/src/common/action.c b/src/common/action.c new file mode 100644 index 000000000..d07f36c2d --- /dev/null +++ b/src/common/action.c @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include + +enum lttng_action_type lttng_action_get_type(struct lttng_action *action) +{ + return action ? action->type : LTTNG_ACTION_TYPE_UNKNOWN; +} + +void lttng_action_destroy(struct lttng_action *action) +{ + if (!action) { + return; + } + + assert(action->destroy); + action->destroy(action); +} + +LTTNG_HIDDEN +bool lttng_action_validate(struct lttng_action *action) +{ + bool valid; + + if (!action) { + valid = false; + goto end; + } + + if (!action->validate) { + /* Sub-class guarantees that it can never be invalid. */ + valid = true; + goto end; + } + + valid = action->validate(action); +end: + return valid; +} + +LTTNG_HIDDEN +ssize_t lttng_action_serialize(struct lttng_action *action, char *buf) +{ + ssize_t ret, action_size; + struct lttng_action_comm action_comm; + + if (!action) { + ret = -1; + goto end; + } + + action_comm.action_type = (int8_t) action->type; + ret = sizeof(struct lttng_action_comm); + if (buf) { + memcpy(buf, &action_comm, ret); + buf += ret; + } + + action_size = action->serialize(action, buf); + if (action_size < 0) { + ret = action_size; + goto end; + } + ret += action_size; +end: + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_action_create_from_buffer(const char *buf, + struct lttng_action **_action) +{ + ssize_t ret, action_size = sizeof(struct lttng_action_comm); + struct lttng_action *action; + struct lttng_action_comm *action_comm = + (struct lttng_action_comm *) buf; + + if (!buf || !_action) { + ret = -1; + goto end; + } + + DBG("Deserializing action from buffer"); + switch (action_comm->action_type) { + case LTTNG_ACTION_TYPE_NOTIFY: + action = lttng_action_notify_create(); + break; + default: + ret = -1; + goto end; + } + + if (!action) { + ret = -1; + goto end; + } + ret = action_size; + *_action = action; +end: + return ret; +} diff --git a/src/common/buffer-usage.c b/src/common/buffer-usage.c new file mode 100644 index 000000000..91db15b13 --- /dev/null +++ b/src/common/buffer-usage.c @@ -0,0 +1,784 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +static +double fixed_to_double(uint32_t val) +{ + return (double) val / (double) UINT32_MAX; +} + +static +uint64_t double_to_fixed(double val) +{ + return (val * (double) UINT32_MAX); +} + +static +bool is_usage_condition(struct lttng_condition *condition) +{ + enum lttng_condition_type type = lttng_condition_get_type(condition); + + return type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW || + type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH; +} + +static +bool is_usage_evaluation(struct lttng_evaluation *evaluation) +{ + enum lttng_condition_type type = lttng_evaluation_get_type(evaluation); + + return type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW || + type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH; +} + +static +void lttng_condition_buffer_usage_destroy(struct lttng_condition *condition) +{ + struct lttng_condition_buffer_usage *usage; + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + + free(usage->session_name); + free(usage->channel_name); + free(usage); +} + +static +bool lttng_condition_buffer_usage_validate(struct lttng_condition *condition) +{ + bool valid = false; + struct lttng_condition_buffer_usage *usage; + + if (!condition) { + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->session_name) { + ERR("Invalid buffer condition: a target session name must be set."); + goto end; + } + if (!usage->channel_name) { + ERR("Invalid buffer condition: a target channel name must be set."); + goto end; + } + if (!usage->threshold_ratio.set && !usage->threshold_bytes.set) { + ERR("Invalid buffer condition: a threshold must be set."); + goto end; + } + + valid = true; +end: + return valid; +} + +static +ssize_t lttng_condition_buffer_usage_serialize(struct lttng_condition *condition, + char *buf) +{ + struct lttng_condition_buffer_usage *usage; + ssize_t ret, size; + size_t session_name_len, channel_name_len; + + if (!condition || !is_usage_condition(condition)) { + ret = -1; + goto end; + } + + DBG("Serializing buffer usage condition"); + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + size = sizeof(struct lttng_condition_buffer_usage_comm); + session_name_len = strlen(usage->session_name) + 1; + channel_name_len = strlen(usage->channel_name) + 1; + size += session_name_len + channel_name_len; + if (buf) { + struct lttng_condition_buffer_usage_comm usage_comm = { + .threshold_set_in_bytes = usage->threshold_bytes.set ? 1 : 0, + .session_name_len = session_name_len, + .channel_name_len = channel_name_len, + .domain_type = (int8_t) usage->domain.type, + }; + + if (usage->threshold_bytes.set) { + usage_comm.threshold = usage->threshold_bytes.value; + } else { + uint64_t val = double_to_fixed( + usage->threshold_ratio.value); + + if (val > UINT32_MAX) { + /* overflow. */ + ret = -1; + goto end; + } + usage_comm.threshold = val; + } + + memcpy(buf, &usage_comm, sizeof(usage_comm)); + buf += sizeof(usage_comm); + memcpy(buf, usage->session_name, session_name_len); + buf += session_name_len; + memcpy(buf, usage->channel_name, channel_name_len); + buf += channel_name_len; + } + ret = size; +end: + return ret; +} + +static +bool lttng_condition_buffer_usage_is_equal(struct lttng_condition *_a, + struct lttng_condition *_b) +{ + bool is_equal = false; + struct lttng_condition_buffer_usage *a, *b; + + a = container_of(_a, struct lttng_condition_buffer_usage, parent); + b = container_of(_b, struct lttng_condition_buffer_usage, parent); + + if ((a->threshold_ratio.set && !b->threshold_ratio.set) || + (a->threshold_bytes.set && !b->threshold_bytes.set)) { + goto end; + } + + if (a->threshold_ratio.set && b->threshold_ratio.set) { + double a_value, b_value, diff; + + a_value = a->threshold_ratio.value; + b_value = b->threshold_ratio.value; + diff = fabs(a_value - b_value); + + if (diff > DBL_EPSILON) { + goto end; + } + } else if (a->threshold_bytes.set && b->threshold_bytes.set) { + uint64_t a_value, b_value; + + a_value = a->threshold_bytes.value; + b_value = b->threshold_bytes.value; + if (a_value != b_value) { + goto end; + } + } + + if ((a->session_name && !b->session_name) || + (!a->session_name && b->session_name)) { + goto end; + } + + if (a->channel_name && b->channel_name) { + if (strcmp(a->channel_name, b->channel_name)) { + goto end; + } + } if ((a->channel_name && !b->channel_name) || + (!a->channel_name && b->channel_name)) { + goto end; + } + + if (a->channel_name && b->channel_name) { + if (strcmp(a->channel_name, b->channel_name)) { + goto end; + } + } + + if ((a->domain.set && !b->domain.set) || + (!a->domain.set && b->domain.set)) { + goto end; + } + + if (a->domain.set && b->domain.set) { + if (a->domain.type != b->domain.type) { + goto end; + } + } + is_equal = true; +end: + return is_equal; +} + +static +struct lttng_condition *lttng_condition_buffer_usage_create( + enum lttng_condition_type type) +{ + struct lttng_condition_buffer_usage *condition; + + condition = zmalloc(sizeof(struct lttng_condition_buffer_usage)); + if (!condition) { + goto end; + } + + lttng_condition_init(&condition->parent, type); + condition->parent.validate = lttng_condition_buffer_usage_validate; + condition->parent.serialize = lttng_condition_buffer_usage_serialize; + condition->parent.equal = lttng_condition_buffer_usage_is_equal; + condition->parent.destroy = lttng_condition_buffer_usage_destroy; +end: + return &condition->parent; +} + +struct lttng_condition *lttng_condition_buffer_usage_low_create(void) +{ + return lttng_condition_buffer_usage_create( + LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW); +} + +struct lttng_condition *lttng_condition_buffer_usage_high_create(void) +{ + return lttng_condition_buffer_usage_create( + LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH); +} + +static +ssize_t init_condition_from_buffer(struct lttng_condition *condition, + const char *buf) +{ + ssize_t ret, condition_size; + enum lttng_condition_status status; + enum lttng_domain_type domain_type; + struct lttng_condition_buffer_usage_comm *condition_comm = + (struct lttng_condition_buffer_usage_comm *) buf; + const char *session_name, *channel_name; + + if (condition_comm->threshold_set_in_bytes) { + status = lttng_condition_buffer_usage_set_threshold(condition, + condition_comm->threshold); + } else { + status = lttng_condition_buffer_usage_set_threshold_ratio( + condition, + fixed_to_double(condition_comm->threshold)); + } + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to initialize buffer usage condition threshold"); + ret = -1; + goto end; + } + + if (condition_comm->domain_type <= LTTNG_DOMAIN_NONE || + condition_comm->domain_type > LTTNG_DOMAIN_PYTHON) { + /* Invalid domain value. */ + ERR("Invalid domain type value (%i) found in condition buffer", + (int) condition_comm->domain_type); + ret = -1; + goto end; + } + + domain_type = (enum lttng_domain_type) condition_comm->domain_type; + status = lttng_condition_buffer_usage_set_domain_type(condition, + domain_type); + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to set buffer usage condition domain"); + ret = -1; + goto end; + } + + session_name = buf + sizeof(struct lttng_condition_buffer_usage_comm); + channel_name = session_name + condition_comm->session_name_len; + + status = lttng_condition_buffer_usage_set_session_name(condition, + session_name); + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to set buffer usage session name"); + ret = -1; + goto end; + } + + status = lttng_condition_buffer_usage_set_channel_name(condition, + channel_name); + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to set buffer usage channel name"); + ret = -1; + goto end; + } + + if (!lttng_condition_validate(condition)) { + ret = -1; + goto end; + } + + condition_size = sizeof(*condition_comm); + condition_size += (ssize_t) condition_comm->session_name_len; + condition_size += (ssize_t) condition_comm->channel_name_len; + ret = condition_size; +end: + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_buffer_usage_low_create_from_buffer(const char *buf, + struct lttng_condition **_condition) +{ + ssize_t ret; + struct lttng_condition *condition = + lttng_condition_buffer_usage_low_create(); + + if (!_condition || !condition) { + ret = -1; + goto error; + } + + ret = init_condition_from_buffer(condition, buf); + if (ret < 0) { + goto error; + } + + *_condition = condition; + return ret; +error: + lttng_condition_destroy(condition); + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_buffer_usage_high_create_from_buffer(const char *buf, + struct lttng_condition **_condition) +{ + ssize_t ret; + struct lttng_condition *condition = + lttng_condition_buffer_usage_high_create(); + + if (!_condition || !condition) { + ret = -1; + goto error; + } + + ret = init_condition_from_buffer(condition, buf); + if (ret < 0) { + goto error; + } + + *_condition = condition; + return ret; +error: + lttng_condition_destroy(condition); + return ret; +} + +static +struct lttng_evaluation *create_evaluation_from_buffer( + enum lttng_condition_type type, const char *buf) +{ + struct lttng_evaluation_buffer_usage_comm *comm = + (struct lttng_evaluation_buffer_usage_comm *) buf; + struct lttng_evaluation *evaluation; + + evaluation = lttng_evaluation_buffer_usage_create(type, + comm->buffer_use, comm->buffer_capacity); + return evaluation; +} + +LTTNG_HIDDEN +ssize_t lttng_evaluation_buffer_usage_low_create_from_buffer(const char *buf, + struct lttng_evaluation **_evaluation) +{ + ssize_t ret; + struct lttng_evaluation *evaluation = NULL; + + if (!_evaluation) { + ret = -1; + goto error; + } + + evaluation = create_evaluation_from_buffer( + LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW, buf); + if (!evaluation) { + ret = -1; + goto error; + } + + *_evaluation = evaluation; + ret = sizeof(struct lttng_evaluation_buffer_usage_comm); + return ret; +error: + lttng_evaluation_destroy(evaluation); + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_evaluation_buffer_usage_high_create_from_buffer(const char *buf, + struct lttng_evaluation **_evaluation) +{ + ssize_t ret; + struct lttng_evaluation *evaluation = NULL; + + if (!_evaluation) { + ret = -1; + goto error; + } + + evaluation = create_evaluation_from_buffer( + LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH, buf); + if (!evaluation) { + ret = -1; + goto error; + } + + *_evaluation = evaluation; + ret = sizeof(struct lttng_evaluation_buffer_usage_comm); + return ret; +error: + lttng_evaluation_destroy(evaluation); + return ret; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_threshold_ratio( + struct lttng_condition *condition, double *threshold_ratio) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || + !threshold_ratio) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->threshold_ratio.set) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *threshold_ratio = usage->threshold_ratio.value; +end: + return status; +} + +/* threshold_ratio expressed as [0.0, 1.0]. */ +enum lttng_condition_status +lttng_condition_buffer_usage_set_threshold_ratio( + struct lttng_condition *condition, double threshold_ratio) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || + threshold_ratio < 0.0 || + threshold_ratio > 1.0) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + usage->threshold_ratio.set = true; + usage->threshold_bytes.set = false; + usage->threshold_ratio.value = threshold_ratio; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_threshold( + struct lttng_condition *condition, uint64_t *threshold_bytes) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !threshold_bytes) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->threshold_bytes.set) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *threshold_bytes = usage->threshold_bytes.value; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_set_threshold( + struct lttng_condition *condition, uint64_t threshold_bytes) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition)) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + usage->threshold_ratio.set = false; + usage->threshold_bytes.set = true; + usage->threshold_bytes.value = threshold_bytes; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_session_name( + struct lttng_condition *condition, const char **session_name) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !session_name) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->session_name) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *session_name = usage->session_name; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_set_session_name( + struct lttng_condition *condition, const char *session_name) +{ + char *session_name_copy; + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !session_name || + strlen(session_name) == 0) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + session_name_copy = strdup(session_name); + if (!session_name_copy) { + status = LTTNG_CONDITION_STATUS_ERROR; + goto end; + } + + if (usage->session_name) { + free(usage->session_name); + } + usage->session_name = session_name_copy; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_channel_name( + struct lttng_condition *condition, const char **channel_name) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !channel_name) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->channel_name) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *channel_name = usage->channel_name; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_set_channel_name( + struct lttng_condition *condition, const char *channel_name) +{ + char *channel_name_copy; + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !channel_name || + strlen(channel_name) == 0) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + channel_name_copy = strdup(channel_name); + if (!channel_name_copy) { + status = LTTNG_CONDITION_STATUS_ERROR; + goto end; + } + + if (usage->channel_name) { + free(usage->channel_name); + } + usage->channel_name = channel_name_copy; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_get_domain_type( + struct lttng_condition *condition, enum lttng_domain_type *type) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || !type) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + if (!usage->domain.set) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *type = usage->domain.type; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_buffer_usage_set_domain_type( + struct lttng_condition *condition, enum lttng_domain_type type) +{ + struct lttng_condition_buffer_usage *usage; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !is_usage_condition(condition) || + type == LTTNG_DOMAIN_NONE) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + usage = container_of(condition, struct lttng_condition_buffer_usage, + parent); + usage->domain.set = true; + usage->domain.type = type; +end: + return status; +} + +static +ssize_t lttng_evaluation_buffer_usage_serialize( + struct lttng_evaluation *evaluation, char *buf) +{ + ssize_t ret; + struct lttng_evaluation_buffer_usage *usage; + + usage = container_of(evaluation, struct lttng_evaluation_buffer_usage, + parent); + if (buf) { + struct lttng_evaluation_buffer_usage_comm comm = { + .buffer_use = usage->buffer_use, + .buffer_capacity = usage->buffer_capacity, + }; + + memcpy(buf, &comm, sizeof(comm)); + } + + ret = sizeof(struct lttng_evaluation_buffer_usage_comm); + return ret; +} + +static +void lttng_evaluation_buffer_usage_destroy( + struct lttng_evaluation *evaluation) +{ + struct lttng_evaluation_buffer_usage *usage; + + usage = container_of(evaluation, struct lttng_evaluation_buffer_usage, + parent); + free(usage); +} + +LTTNG_HIDDEN +struct lttng_evaluation *lttng_evaluation_buffer_usage_create( + enum lttng_condition_type type, uint64_t use, uint64_t capacity) +{ + struct lttng_evaluation_buffer_usage *usage; + + usage = zmalloc(sizeof(struct lttng_evaluation_buffer_usage)); + if (!usage) { + goto end; + } + + usage->parent.type = type; + usage->buffer_use = use; + usage->buffer_capacity = capacity; + usage->parent.serialize = lttng_evaluation_buffer_usage_serialize; + usage->parent.destroy = lttng_evaluation_buffer_usage_destroy; +end: + return &usage->parent; +} + +/* + * Get the sampled buffer usage which caused the associated condition to + * evaluate to "true". + */ +enum lttng_evaluation_status +lttng_evaluation_buffer_usage_get_usage_ratio( + struct lttng_evaluation *evaluation, double *usage_ratio) +{ + struct lttng_evaluation_buffer_usage *usage; + enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK; + + if (!evaluation || !is_usage_evaluation(evaluation) || !usage_ratio) { + status = LTTNG_EVALUATION_STATUS_INVALID; + goto end; + } + + usage = container_of(evaluation, struct lttng_evaluation_buffer_usage, + parent); + *usage_ratio = (double) usage->buffer_use / + (double) usage->buffer_capacity; +end: + return status; +} + +enum lttng_evaluation_status +lttng_evaluation_buffer_usage_get_usage(struct lttng_evaluation *evaluation, + uint64_t *usage_bytes) +{ + struct lttng_evaluation_buffer_usage *usage; + enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK; + + if (!evaluation || !is_usage_evaluation(evaluation) || !usage_bytes) { + status = LTTNG_EVALUATION_STATUS_INVALID; + goto end; + } + + usage = container_of(evaluation, struct lttng_evaluation_buffer_usage, + parent); + *usage_bytes = usage->buffer_use; +end: + return status; +} diff --git a/src/common/condition.c b/src/common/condition.c new file mode 100644 index 000000000..2f7f41364 --- /dev/null +++ b/src/common/condition.c @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include + +enum lttng_condition_type lttng_condition_get_type( + struct lttng_condition *condition) +{ + return condition ? condition->type : LTTNG_CONDITION_TYPE_UNKNOWN; +} + +void lttng_condition_destroy(struct lttng_condition *condition) +{ + if (!condition) { + return; + } + + assert(condition->destroy); + condition->destroy(condition); +} + +LTTNG_HIDDEN +bool lttng_condition_validate(struct lttng_condition *condition) +{ + bool valid; + + if (!condition) { + valid = false; + goto end; + } + + if (!condition->validate) { + /* Sub-class guarantees that it can never be invalid. */ + valid = true; + goto end; + } + + valid = condition->validate(condition); +end: + return valid; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_serialize(struct lttng_condition *condition, char *buf) +{ + ssize_t ret, condition_size; + struct lttng_condition_comm condition_comm; + + if (!condition) { + ret = -1; + goto end; + } + + condition_comm.condition_type = (int8_t) condition->type; + ret = sizeof(struct lttng_condition_comm); + if (buf) { + memcpy(buf, &condition_comm, ret); + buf += ret; + } + + condition_size = condition->serialize(condition, buf); + if (condition_size < 0) { + ret = condition_size; + goto end; + } + ret += condition_size; +end: + return ret; +} + +LTTNG_HIDDEN +bool lttng_condition_is_equal(struct lttng_condition *a, + struct lttng_condition *b) +{ + bool is_equal = false; + + if (!a || !b) { + goto end; + } + + if (a->type != b->type) { + goto end; + } + + is_equal = a->equal ? a->equal(a, b) : true; +end: + return is_equal; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_create_from_buffer(const char *buf, + struct lttng_condition **condition) +{ + ssize_t ret, condition_size = 0; + struct lttng_condition_comm *condition_comm = + (struct lttng_condition_comm *) buf; + + if (!buf || !condition) { + ret = -1; + goto end; + } + + DBG("Deserializing condition from buffer"); + condition_size += sizeof(*condition_comm); + buf += condition_size; + + switch ((enum lttng_condition_type) condition_comm->condition_type) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + ret = lttng_condition_buffer_usage_low_create_from_buffer(buf, + condition); + if (ret < 0) { + goto end; + } + condition_size += ret; + break; + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + ret = lttng_condition_buffer_usage_high_create_from_buffer(buf, + condition); + if (ret < 0) { + goto end; + } + condition_size += ret; + break; + default: + ERR("Attempted to create condition of unknown type (%i)", + (int) condition_comm->condition_type); + ret = -1; + goto end; + } + ret = condition_size; +end: + return ret; +} + +LTTNG_HIDDEN +void lttng_condition_init(struct lttng_condition *condition, + enum lttng_condition_type type) +{ + condition->type = type; +} diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index b0a434284..3234e8514 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -61,8 +62,14 @@ static void setmask(sigset_t *mask) if (ret) { PERROR("sigaddset live"); } + ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR); + if (ret) { + PERROR("sigaddset monitor"); + } } +static int channel_monitor_pipe = -1; + /* * Execute action on a timer switch. * @@ -71,7 +78,7 @@ static void setmask(sigset_t *mask) * deadlocks. */ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si, void *uc) + int sig, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -304,7 +311,7 @@ end: * Execute action on a live timer */ static void live_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si, void *uc) + int sig, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -411,44 +418,95 @@ void consumer_timer_signal_thread_qs(unsigned int signr) } /* - * Set the timer for periodical metadata flush. + * Start a timer channel timer which will fire at a given interval + * (timer_interval_us)and fire a given signal (signal). + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). */ -void consumer_timer_switch_start(struct lttng_consumer_channel *channel, - unsigned int switch_timer_interval) +static +int consumer_channel_timer_start(timer_t *timer_id, + struct lttng_consumer_channel *channel, + unsigned int timer_interval_us, int signal) { - int ret; + int ret = 0, delete_ret; struct sigevent sev; struct itimerspec its; assert(channel); assert(channel->key); - if (switch_timer_interval == 0) { - return; + if (timer_interval_us == 0) { + /* No creation needed; not an error. */ + ret = 1; + goto end; } sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH; + sev.sigev_signo = signal; sev.sigev_value.sival_ptr = channel; - ret = timer_create(CLOCKID, &sev, &channel->switch_timer); + ret = timer_create(CLOCKID, &sev, timer_id); if (ret == -1) { PERROR("timer_create"); + goto end; } - channel->switch_timer_enabled = 1; - its.it_value.tv_sec = switch_timer_interval / 1000000; - its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000; + its.it_value.tv_sec = timer_interval_us / 1000000; + its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000; its.it_interval.tv_sec = its.it_value.tv_sec; its.it_interval.tv_nsec = its.it_value.tv_nsec; - ret = timer_settime(channel->switch_timer, 0, &its, NULL); + ret = timer_settime(*timer_id, 0, &its, NULL); if (ret == -1) { PERROR("timer_settime"); + goto error_destroy_timer; + } +end: + return ret; +error_destroy_timer: + delete_ret = timer_delete(*timer_id); + if (delete_ret == -1) { + PERROR("timer_delete"); + } + goto end; +} + +static +int consumer_channel_timer_stop(timer_t *timer_id, int signal) +{ + int ret = 0; + + ret = timer_delete(*timer_id); + if (ret == -1) { + PERROR("timer_delete"); + goto end; } + + consumer_timer_signal_thread_qs(signal); + *timer_id = 0; +end: + return ret; +} + +/* + * Set the channel's switch timer. + */ +void consumer_timer_switch_start(struct lttng_consumer_channel *channel, + unsigned int switch_timer_interval_us) +{ + int ret; + + assert(channel); + assert(channel->key); + + ret = consumer_channel_timer_start(&channel->switch_timer, channel, + switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH); + + channel->switch_timer_enabled = !!(ret == 0); } /* - * Stop and delete timer. + * Stop and delete the channel's switch timer. */ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) { @@ -456,72 +514,91 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) assert(channel); - ret = timer_delete(channel->switch_timer); + ret = consumer_channel_timer_stop(&channel->switch_timer, + LTTNG_CONSUMER_SIG_SWITCH); if (ret == -1) { - PERROR("timer_delete"); + ERR("Failed to stop switch timer"); } - consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH); - - channel->switch_timer = 0; channel->switch_timer_enabled = 0; } /* - * Set the timer for the live mode. + * Set the channel's live timer. */ void consumer_timer_live_start(struct lttng_consumer_channel *channel, - int live_timer_interval) + unsigned int live_timer_interval_us) { int ret; - struct sigevent sev; - struct itimerspec its; assert(channel); assert(channel->key); - if (live_timer_interval <= 0) { - return; - } + ret = consumer_channel_timer_start(&channel->live_timer, channel, + live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE); - sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE; - sev.sigev_value.sival_ptr = channel; - ret = timer_create(CLOCKID, &sev, &channel->live_timer); - if (ret == -1) { - PERROR("timer_create"); - } - channel->live_timer_enabled = 1; + channel->live_timer_enabled = !!(ret == 0); +} - its.it_value.tv_sec = live_timer_interval / 1000000; - its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000; - its.it_interval.tv_sec = its.it_value.tv_sec; - its.it_interval.tv_nsec = its.it_value.tv_nsec; +/* + * Stop and delete the channel's live timer. + */ +void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +{ + int ret; - ret = timer_settime(channel->live_timer, 0, &its, NULL); + assert(channel); + + ret = consumer_channel_timer_stop(&channel->live_timer, + LTTNG_CONSUMER_SIG_LIVE); if (ret == -1) { - PERROR("timer_settime"); + ERR("Failed to stop live timer"); } + + channel->live_timer_enabled = 0; } /* - * Stop and delete timer. + * Set the channel's monitoring timer. + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). */ -void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, + unsigned int monitor_timer_interval_us) { int ret; assert(channel); + assert(channel->key); + assert(!channel->monitor_timer_enabled); - ret = timer_delete(channel->live_timer); + ret = consumer_channel_timer_start(&channel->monitor_timer, channel, + monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR); + channel->monitor_timer_enabled = !!(ret == 0); + return ret; +} + +/* + * Stop and delete the channel's monitoring timer. + */ +int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel) +{ + int ret; + + assert(channel); + assert(channel->monitor_timer_enabled); + + ret = consumer_channel_timer_stop(&channel->monitor_timer, + LTTNG_CONSUMER_SIG_MONITOR); if (ret == -1) { - PERROR("timer_delete"); + ERR("Failed to stop live timer"); + goto end; } - consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE); - - channel->live_timer = 0; - channel->live_timer_enabled = 0; + channel->monitor_timer_enabled = 0; +end: + return ret; } /* @@ -544,9 +621,159 @@ int consumer_signal_init(void) return 0; } +static +int sample_ust_positions(struct lttng_consumer_channel *channel, + uint64_t *_highest_use, uint64_t *_lowest_use) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + bool empty_channel = true; + uint64_t high = 0, low = UINT64_MAX; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, + &iter.iter, stream, node_channel_id.node) { + unsigned long produced, consumed, usage; + + empty_channel = false; + + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + + ret = ustctl_snapshot_sample_positions(stream->ustream); + if (ret) { + ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret); + pthread_mutex_unlock(&stream->lock); + goto end; + } + ret = ustctl_snapshot_get_consumed(stream->ustream, + &consumed); + if (ret) { + ERR("Failed to get buffer consumed position in monitor timer"); + pthread_mutex_unlock(&stream->lock); + goto end; + } + ret = ustctl_snapshot_get_produced(stream->ustream, + &produced); + if (ret) { + ERR("Failed to get buffer produced position in monitor timer"); + pthread_mutex_unlock(&stream->lock); + goto end; + } + + usage = produced - consumed; + high = (usage > high) ? usage : high; + low = (usage < low) ? usage : low; + next: + pthread_mutex_unlock(&stream->lock); + } + + *_highest_use = high; + *_lowest_use = low; +end: + rcu_read_unlock(); + if (empty_channel) { + ret = -1; + } + return ret; +} + +/* + * Execute action on a monitor timer. + */ +static +void monitor_timer(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_channel *channel) +{ + int ret; + int channel_monitor_pipe = + consumer_timer_thread_get_channel_monitor_pipe(); + struct lttcomm_consumer_channel_monitor_msg msg = { + .key = channel->key, + }; + + assert(channel); + pthread_mutex_lock(&consumer_data.lock); + + if (channel_monitor_pipe < 0) { + goto end; + } + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* TODO */ + ret = -1; + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + { + ret = sample_ust_positions(channel, &msg.highest, &msg.lowest); + break; + } + default: + abort(); + } + + if (ret) { + goto end; + } + + /* + * Writes performed here are assumed to be atomic which is only + * guaranteed for sizes < than PIPE_BUF. + */ + assert(sizeof(msg) <= PIPE_BUF); + + do { + ret = write(channel_monitor_pipe, &msg, sizeof(msg)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + if (errno == EAGAIN) { + /* Not an error, the sample is merely dropped. */ + DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64, + channel->key); + } else { + PERROR("write to the channel monitor pipe"); + } + } else { + DBG("Sent channel monitoring sample for channel key %" PRIu64 + ", (highest = %" PRIu64 ", lowest = %"PRIu64")", + channel->key, msg.highest, msg.lowest); + } +end: + pthread_mutex_unlock(&consumer_data.lock); +} + +int consumer_timer_thread_get_channel_monitor_pipe(void) +{ + return uatomic_read(&channel_monitor_pipe); +} + +int consumer_timer_thread_set_channel_monitor_pipe(int fd) +{ + int ret; + + ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd); + if (ret != -1) { + ret = -1; + goto end; + } + ret = 0; +end: + return ret; +} + /* * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH, - * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE. + * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and + * LTTNG_CONSUMER_SIG_MONITOR. */ void *consumer_timer_thread(void *data) { @@ -575,20 +802,31 @@ void *consumer_timer_thread(void *data) health_poll_entry(); signr = sigwaitinfo(&mask, &info); health_poll_exit(); + + /* + * NOTE: cascading conditions are used instead of a switch case + * since the use of SIGRTMIN in the definition of the signals' + * values prevents the reduction to an integer constant. + */ if (signr == -1) { if (errno != EINTR) { PERROR("sigwaitinfo"); } continue; } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) { - metadata_switch_timer(ctx, info.si_signo, &info, NULL); + metadata_switch_timer(ctx, info.si_signo, &info); } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) { cmm_smp_mb(); CMM_STORE_SHARED(timer_signal.qs_done, 1); cmm_smp_mb(); DBG("Signal timer metadata thread teardown"); } else if (signr == LTTNG_CONSUMER_SIG_LIVE) { - live_timer(ctx, info.si_signo, &info, NULL); + live_timer(ctx, info.si_signo, &info); + } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) { + struct lttng_consumer_channel *channel; + + channel = info.si_value.sival_ptr; + monitor_timer(ctx, channel); } else { ERR("Unexpected signal %d\n", info.si_signo); } diff --git a/src/common/consumer/consumer-timer.h b/src/common/consumer/consumer-timer.h index 22e74574c..851a172aa 100644 --- a/src/common/consumer/consumer-timer.h +++ b/src/common/consumer/consumer-timer.h @@ -27,6 +27,7 @@ #define LTTNG_CONSUMER_SIG_SWITCH SIGRTMIN + 10 #define LTTNG_CONSUMER_SIG_TEARDOWN SIGRTMIN + 11 #define LTTNG_CONSUMER_SIG_LIVE SIGRTMIN + 12 +#define LTTNG_CONSUMER_SIG_MONITOR SIGRTMIN + 13 #define CLOCKID CLOCK_MONOTONIC @@ -44,15 +45,21 @@ struct timer_signal_data { }; void consumer_timer_switch_start(struct lttng_consumer_channel *channel, - unsigned int switch_timer_interval); + unsigned int switch_timer_interval_us); void consumer_timer_switch_stop(struct lttng_consumer_channel *channel); void consumer_timer_live_start(struct lttng_consumer_channel *channel, - int live_timer_interval); + unsigned int live_timer_interval_us); void consumer_timer_live_stop(struct lttng_consumer_channel *channel); +int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, + unsigned int monitor_timer_interval_us); +int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel); void *consumer_timer_thread(void *data); int consumer_signal_init(void); int consumer_flush_kernel_index(struct lttng_consumer_stream *stream); int consumer_flush_ust_index(struct lttng_consumer_stream *stream); +int consumer_timer_thread_get_channel_monitor_pipe(void); +int consumer_timer_thread_set_channel_monitor_pipe(int fd); + #endif /* CONSUMER_TIMER_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index e379171ae..3b857dec3 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -368,6 +368,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); } + if (channel->monitor_timer_enabled == 1) { + consumer_timer_monitor_stop(channel); + } switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -1348,6 +1351,8 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_metadata_pipe; } + ctx->channel_monitor_pipe = -1; + return ctx; error_metadata_pipe: diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 37adecbfe..883fd3532 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -61,6 +61,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_DISCARDED_EVENTS, LTTNG_CONSUMER_LOST_PACKETS, LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL, + LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, }; /* State of each fd in consumer */ @@ -160,6 +161,7 @@ struct lttng_consumer_channel { /* Metadata cache is metadata channel */ struct consumer_metadata_cache *metadata_cache; + /* For UST metadata periodical flush */ int switch_timer_enabled; timer_t switch_timer; @@ -170,6 +172,10 @@ struct lttng_consumer_channel { timer_t live_timer; int live_timer_error; + /* For channel monitoring timer. */ + int monitor_timer_enabled; + timer_t monitor_timer; + /* On-disk circular buffer */ uint64_t tracefile_size; uint64_t tracefile_count; @@ -539,6 +545,11 @@ struct lttng_consumer_local_data { int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ struct lttng_pipe *consumer_metadata_pipe; + /* + * Pipe used by the channel monitoring timers to provide state samples + * to the session daemon (write-only). + */ + int channel_monitor_pipe; }; /* diff --git a/src/common/defaults.h b/src/common/defaults.h index 37d222a78..4b27fb320 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -106,10 +106,12 @@ #define DEFAULT_LTTNG_EXTRA_KMOD_PROBES "LTTNG_EXTRA_KMOD_PROBES" /* Default unix socket path */ -#define DEFAULT_GLOBAL_CLIENT_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/client-lttng-sessiond" -#define DEFAULT_HOME_CLIENT_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/client-lttng-sessiond" -#define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/sessiond-health" -#define DEFAULT_HOME_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-health" +#define DEFAULT_GLOBAL_CLIENT_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/client-lttng-sessiond" +#define DEFAULT_HOME_CLIENT_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/client-lttng-sessiond" +#define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/sessiond-health" +#define DEFAULT_HOME_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-health" +#define DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/sessiond-notification" +#define DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/sessiond-notification" /* Default consumer health unix socket path */ #define DEFAULT_GLOBAL_USTCONSUMER32_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/ustconsumerd32/health" diff --git a/src/common/dynamic-buffer.c b/src/common/dynamic-buffer.c new file mode 100644 index 000000000..d617b118e --- /dev/null +++ b/src/common/dynamic-buffer.c @@ -0,0 +1,166 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include + +static +size_t round_to_power_of_2(size_t val) +{ + int order; + size_t rounded; + + order = utils_get_count_order_u64(val); + assert(order >= 0); + rounded = (1ULL << order); + assert(rounded >= val); + + return rounded; +} + +void lttng_dynamic_buffer_init(struct lttng_dynamic_buffer *buffer) +{ + assert(buffer); + memset(buffer, 0, sizeof(*buffer)); +} + +int lttng_dynamic_buffer_append(struct lttng_dynamic_buffer *buffer, + const void *buf, size_t len) +{ + int ret = 0; + + if (!buffer || (!buf && len)) { + ret = -1; + goto end; + } + + if (len == 0) { + /* Not an error, no-op. */ + goto end; + } + + if ((buffer->capacity - buffer->size) < len) { + ret = lttng_dynamic_buffer_set_capacity(buffer, + buffer->capacity + + (len - (buffer->capacity - buffer->size))); + if (ret) { + goto end; + } + } + + memcpy(buffer->data + buffer->size, buf, len); + buffer->size += len; +end: + return ret; +} + +int lttng_dynamic_buffer_set_size(struct lttng_dynamic_buffer *buffer, + size_t new_size) +{ + int ret = 0; + + if (!buffer) { + goto end; + } + + if (new_size == buffer->size) { + goto end; + } + + if (new_size > buffer->capacity) { + ret = lttng_dynamic_buffer_set_capacity(buffer, new_size); + if (ret) { + goto end; + } + } else if (new_size > buffer->size) { + memset(buffer->data + buffer->size, 0, new_size - buffer->size); + buffer->size = new_size; + } else { + /* + * Shrinking size. There is no need to zero-out the newly + * released memory as it will either be: + * - overwritten by lttng_dynamic_buffer_append, + * - expanded later, which will zero-out the memory + * + * Users of external APIs are encouraged to set the buffer's + * size _before_ making such calls. + */ + } + buffer->size = new_size; +end: + return ret; +} + +int lttng_dynamic_buffer_set_capacity(struct lttng_dynamic_buffer *buffer, + size_t new_capacity) +{ + int ret = 0; + size_t rounded_capacity = round_to_power_of_2(new_capacity); + + if (!buffer || new_capacity < buffer->size) { + ret = -1; + goto end; + } + + if (rounded_capacity == buffer->capacity) { + goto end; + } + + if (!buffer->data) { + buffer->data = zmalloc(rounded_capacity); + if (!buffer->data) { + ret = -1; + goto end; + } + } else { + void *new_buf; + + new_buf = realloc(buffer->data, rounded_capacity); + if (new_buf) { + if (rounded_capacity > buffer->capacity) { + memset(new_buf + buffer->capacity, 0, + rounded_capacity - buffer->capacity); + } + } else { + /* Realloc failed, try to acquire a new block. */ + new_buf = zmalloc(rounded_capacity); + if (!new_buf) { + ret = -1; + goto end; + } + memcpy(new_buf, buffer->data, buffer->size); + free(buffer->data); + } + buffer->data = new_buf; + } + buffer->capacity = rounded_capacity; +end: + return ret; +} + +/* Release any memory used by the dynamic buffer. */ +void lttng_dynamic_buffer_reset(struct lttng_dynamic_buffer *buffer) +{ + if (!buffer) { + return; + } + buffer->size = 0; + buffer->capacity = 0; + free(buffer->data); +} diff --git a/src/common/dynamic-buffer.h b/src/common/dynamic-buffer.h new file mode 100644 index 000000000..82efd59a4 --- /dev/null +++ b/src/common/dynamic-buffer.h @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_DYNAMIC_BUFFER_H +#define LTTNG_DYNAMIC_BUFFER_H + +#include +#include + +struct lttng_dynamic_buffer { + char *data; + size_t size; + size_t capacity; +}; + +void lttng_dynamic_buffer_init(struct lttng_dynamic_buffer *buffer); + +int lttng_dynamic_buffer_append(struct lttng_dynamic_buffer *buffer, + const void *buf, size_t len); + +/* + * Set the buffer's size to new_size. The capacity of the buffer will + * be expanded (if necessary) to accomodate new_size. Areas acquired by + * an enlarging new_size _will be zeroed_. + * + * Be careful to expand the buffer's size _before_ calling out external + * APIs (e.g. read(3)) which may populate the buffer as setting the size + * _after_ will zero-out the result of the operation. + */ +int lttng_dynamic_buffer_set_size(struct lttng_dynamic_buffer *buffer, + size_t new_size); + +/* + * Set the buffer's capacity to accomodate the new_capacity, allocating memory + * as necessary. The buffer's content is preserved. + * + * If the current size > new_capacity, the operation will fail. + */ +int lttng_dynamic_buffer_set_capacity(struct lttng_dynamic_buffer *buffer, + size_t new_capacity); + +/* Release any memory used by the dynamic buffer. */ +void lttng_dynamic_buffer_reset(struct lttng_dynamic_buffer *buffer); + +#endif /* LTTNG_DYNAMIC_BUFFER_H */ diff --git a/src/common/endpoint.c b/src/common/endpoint.c new file mode 100644 index 000000000..89066de55 --- /dev/null +++ b/src/common/endpoint.c @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include + +static +struct lttng_endpoint lttng_session_daemon_notification_endpoint_instance = { + .type = LTTNG_ENDPOINT_TYPE_DEFAULT_SESSIOND_NOTIFICATION +}; + +struct lttng_endpoint *lttng_session_daemon_notification_endpoint = + <tng_session_daemon_notification_endpoint_instance; diff --git a/src/common/error.c b/src/common/error.c index 938932cdc..2215886db 100644 --- a/src/common/error.c +++ b/src/common/error.c @@ -186,6 +186,10 @@ static const char *error_string_array[] = { [ ERROR_INDEX(LTTNG_ERR_REGEN_STATEDUMP_FAIL) ] = "Failed to regenerate the state dump", [ ERROR_INDEX(LTTNG_ERR_REGEN_STATEDUMP_NOMEM) ] = "Failed to regenerate the state dump, not enough memory", [ ERROR_INDEX(LTTNG_ERR_NOT_SNAPSHOT_SESSION) ] = "Snapshot command can't be applied to a non-snapshot session", + [ ERROR_INDEX(LTTNG_ERR_INVALID_TRIGGER) ] = "Invalid trigger", + [ ERROR_INDEX(LTTNG_ERR_TRIGGER_EXISTS) ] = "Trigger already registered", + [ ERROR_INDEX(LTTNG_ERR_TRIGGER_NOT_FOUND) ] = "Trigger not found", + [ ERROR_INDEX(LTTNG_ERR_COMMAND_CANCELLED) ] = "Command cancelled", /* Last element */ [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code" diff --git a/src/common/evaluation.c b/src/common/evaluation.c new file mode 100644 index 000000000..49cba5dd1 --- /dev/null +++ b/src/common/evaluation.c @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include + +LTTNG_HIDDEN +ssize_t lttng_evaluation_serialize(struct lttng_evaluation *evaluation, + char *buf) +{ + ssize_t ret, offset = 0; + struct lttng_evaluation_comm evaluation_comm; + + evaluation_comm.type = (int8_t) evaluation->type; + if (buf) { + memcpy(buf, &evaluation_comm, sizeof(evaluation_comm)); + } + offset += sizeof(evaluation_comm); + + if (evaluation->serialize) { + ret = evaluation->serialize(evaluation, + buf ? (buf + offset) : NULL); + if (ret < 0) { + goto end; + } + offset += ret; + } + + ret = offset; +end: + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_evaluation_create_from_buffer(const char *buf, + struct lttng_evaluation **evaluation) +{ + ssize_t ret, evaluation_size = 0; + struct lttng_evaluation_comm *evaluation_comm = + (struct lttng_evaluation_comm *) buf; + + if (!buf || !evaluation) { + ret = -1; + goto end; + } + + evaluation_size += sizeof(*evaluation_comm); + buf += evaluation_size; + + switch ((enum lttng_condition_type) evaluation_comm->type) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + ret = lttng_evaluation_buffer_usage_low_create_from_buffer(buf, + evaluation); + if (ret < 0) { + goto end; + } + evaluation_size += ret; + break; + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + ret = lttng_evaluation_buffer_usage_high_create_from_buffer(buf, + evaluation); + if (ret < 0) { + goto end; + } + evaluation_size += ret; + break; + default: + ERR("Attempted to create evaluation of unknown type (%i)", + (int) evaluation_comm->type); + ret = -1; + goto end; + } + ret = evaluation_size; +end: + return ret; +} + +enum lttng_condition_type lttng_evaluation_get_type( + struct lttng_evaluation *evaluation) +{ + return evaluation ? evaluation->type : LTTNG_CONDITION_TYPE_UNKNOWN; +} + +void lttng_evaluation_destroy(struct lttng_evaluation *evaluation) +{ + if (!evaluation) { + return; + } + + assert(evaluation->destroy); + evaluation->destroy(evaluation); +} diff --git a/src/common/macros.h b/src/common/macros.h index 90849ed30..7eaf27cdf 100644 --- a/src/common/macros.h +++ b/src/common/macros.h @@ -20,6 +20,7 @@ #define _MACROS_H #include +#include #include #include @@ -58,6 +59,14 @@ void *zmalloc(size_t len) #define ARRAY_SIZE(array) (sizeof(array) / (sizeof((array)[0]))) #endif +#ifndef container_of +#define container_of(ptr, type, member) \ + ({ \ + const typeof(((type *)NULL)->member) * __ptr = (ptr); \ + (type *)((char *)__ptr - offsetof(type, member)); \ + }) +#endif + #ifndef max #define max(a, b) ((a) > (b) ? (a) : (b)) #endif diff --git a/src/common/notification.c b/src/common/notification.c new file mode 100644 index 000000000..9970974f9 --- /dev/null +++ b/src/common/notification.c @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include + +LTTNG_HIDDEN +struct lttng_notification *lttng_notification_create( + struct lttng_condition *condition, + struct lttng_evaluation *evaluation) +{ + struct lttng_notification *notification = NULL; + + if (!condition || !evaluation) { + goto end; + } + + notification = zmalloc(sizeof(struct lttng_notification)); + if (!notification) { + goto end; + } + + notification->condition = condition; + notification->evaluation = evaluation; + notification->owns_elements = false; +end: + return notification; +} + +LTTNG_HIDDEN +ssize_t lttng_notification_serialize(struct lttng_notification *notification, + char *buf) +{ + ssize_t ret, condition_size, evaluation_size, offset = 0; + struct lttng_notification_comm notification_comm; + + if (!notification) { + ret = -1; + goto end; + } + + offset += sizeof(notification_comm); + condition_size = lttng_condition_serialize(notification->condition, + buf ? (buf + offset) : NULL); + if (condition_size < 0) { + ret = condition_size; + goto end; + } + offset += condition_size; + + evaluation_size = lttng_evaluation_serialize(notification->evaluation, + buf ? (buf + offset) : NULL); + if (evaluation_size < 0) { + ret = evaluation_size; + goto end; + } + offset += evaluation_size; + + if (buf) { + notification_comm.length = + (uint32_t) (condition_size + evaluation_size); + memcpy(buf, ¬ification_comm, sizeof(notification_comm)); + } + ret = offset; +end: + return ret; + +} + +LTTNG_HIDDEN +ssize_t lttng_notification_create_from_buffer(const char *buf, + struct lttng_notification **notification) +{ + ssize_t ret, offset = 0, condition_size, evaluation_size; + struct lttng_notification_comm *notification_comm; + struct lttng_condition *condition; + struct lttng_evaluation *evaluation; + + if (!buf || !notification) { + ret = -1; + goto end; + } + + notification_comm = (struct lttng_notification_comm *) buf; + offset += sizeof(*notification_comm); + + /* struct lttng_condition */ + condition_size = lttng_condition_create_from_buffer(buf + offset, + &condition); + if (condition_size < 0) { + ret = condition_size; + goto end; + } + offset += condition_size; + + /* struct lttng_evaluation */ + evaluation_size = lttng_evaluation_create_from_buffer(buf + offset, + &evaluation); + if (evaluation_size < 0) { + ret = evaluation_size; + goto end; + } + offset += evaluation_size; + + /* Unexpected size of inner-elements; the buffer is corrupted. */ + if ((ssize_t) notification_comm->length != + condition_size + evaluation_size) { + ret = -1; + goto error; + } + + *notification = lttng_notification_create(condition, evaluation); + if (!*notification) { + ret = -1; + goto error; + } + ret = offset; + (*notification)->owns_elements = true; +end: + return ret; +error: + lttng_condition_destroy(condition); + lttng_evaluation_destroy(evaluation); + return ret; +} + +void lttng_notification_destroy(struct lttng_notification *notification) +{ + if (!notification) { + return; + } + + if (notification->owns_elements) { + lttng_condition_destroy(notification->condition); + lttng_evaluation_destroy(notification->evaluation); + } + free(notification); +} + +struct lttng_condition *lttng_notification_get_condition( + struct lttng_notification *notification) +{ + return notification ? notification->condition : NULL; +} + +struct lttng_evaluation *lttng_notification_get_evaluation( + struct lttng_notification *notification) +{ + return notification ? notification->evaluation : NULL; +} diff --git a/src/common/notify.c b/src/common/notify.c new file mode 100644 index 000000000..956c0ecff --- /dev/null +++ b/src/common/notify.c @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include + +static +void lttng_action_notify_destroy(struct lttng_action *action) +{ + free(action); +} + +static +ssize_t lttng_action_notify_serialize(struct lttng_action *action, char *buf) +{ + return 0; +} + +struct lttng_action *lttng_action_notify_create(void) +{ + struct lttng_action_notify *notify; + + notify = zmalloc(sizeof(struct lttng_action_notify)); + if (!notify) { + goto end; + } + + notify->parent.type = LTTNG_ACTION_TYPE_NOTIFY; + notify->parent.serialize = lttng_action_notify_serialize; + notify->parent.destroy = lttng_action_notify_destroy; +end: + return ¬ify->parent; +} diff --git a/src/common/pipe.c b/src/common/pipe.c index 00211238e..ffdecc0d5 100644 --- a/src/common/pipe.c +++ b/src/common/pipe.c @@ -311,3 +311,69 @@ error: unlock_write_side(pipe); return ret; } + +/* + * Return and release the read end of the pipe. + * + * This call transfers the ownership of the read fd of the underlying pipe + * to the caller if it is still open. + * + * Returns the fd of the read end of the pipe, or -1 if it was already closed or + * released. + */ +LTTNG_HIDDEN +int lttng_pipe_release_readfd(struct lttng_pipe *pipe) +{ + int ret; + + if (!pipe) { + ret = -1; + goto end; + } + + lock_read_side(pipe); + if (!lttng_pipe_is_read_open(pipe)) { + ret = -1; + goto end_unlock; + } + ret = pipe->fd[0]; + pipe->fd[0] = -1; + pipe->r_state = LTTNG_PIPE_STATE_CLOSED; +end_unlock: + unlock_read_side(pipe); +end: + return ret; +} + +/* + * Return and release the write end of the pipe. + * + * This call transfers the ownership of the write fd of the underlying pipe + * to the caller if it is still open. + * + * Returns the fd of the write end of the pipe, or -1 if it was alwritey closed + * or released. + */ +LTTNG_HIDDEN +int lttng_pipe_release_writefd(struct lttng_pipe *pipe) +{ + int ret; + + if (!pipe) { + ret = -1; + goto end; + } + + lock_write_side(pipe); + if (!lttng_pipe_is_write_open(pipe)) { + ret = -1; + goto end_unlock; + } + ret = pipe->fd[1]; + pipe->fd[1] = -1; + pipe->w_state = LTTNG_PIPE_STATE_CLOSED; +end_unlock: + unlock_write_side(pipe); +end: + return ret; +} diff --git a/src/common/pipe.h b/src/common/pipe.h index 0bc2db325..b9e0904a2 100644 --- a/src/common/pipe.h +++ b/src/common/pipe.h @@ -90,5 +90,11 @@ ssize_t lttng_pipe_read(struct lttng_pipe *pipe, void *buf, size_t count); LTTNG_HIDDEN ssize_t lttng_pipe_write(struct lttng_pipe *pipe, const void *buf, size_t count); +/* Returns and releases the read end of the pipe. */ +LTTNG_HIDDEN +int lttng_pipe_release_readfd(struct lttng_pipe *pipe); +/* Returns and releases the write end of the pipe. */ +LTTNG_HIDDEN +int lttng_pipe_release_writefd(struct lttng_pipe *pipe); #endif /* LTTNG_PIPE_H */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 0c2667031..3e85b9519 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -96,6 +97,8 @@ enum lttcomm_sessiond_command { LTTNG_SET_SESSION_SHM_PATH = 40, LTTNG_REGENERATE_METADATA = 41, LTTNG_REGENERATE_STATEDUMP = 42, + LTTNG_REGISTER_TRIGGER = 43, + LTTNG_UNREGISTER_TRIGGER = 44, }; enum lttcomm_relayd_command { @@ -146,6 +149,7 @@ enum lttcomm_return_code { LTTCOMM_CONSUMERD_RELAYD_FAIL, /* Error on remote relayd */ LTTCOMM_CONSUMERD_CHANNEL_FAIL, /* Channel creation failed. */ LTTCOMM_CONSUMERD_CHAN_NOT_FOUND, /* Channel not found. */ + LTTCOMM_CONSUMERD_ALREADY_SET, /* Resource already set. */ /* MUST be last element */ LTTCOMM_NR, /* Last element */ @@ -311,6 +315,9 @@ struct lttcomm_session_msg { struct { uint32_t pid; } LTTNG_PACKED pid_tracker; + struct { + uint32_t length; + } LTTNG_PACKED trigger; } u; } LTTNG_PACKED; @@ -454,6 +461,7 @@ struct lttcomm_consumer_msg { uint32_t switch_timer_interval; /* usec */ uint32_t read_timer_interval; /* usec */ unsigned int live_timer_interval; /* usec */ + uint32_t monitor_timer_interval; /* usec */ int32_t output; /* splice, mmap */ int32_t type; /* metadata or per_cpu */ uint64_t session_id; /* Tracing session id */ @@ -531,6 +539,19 @@ struct lttcomm_consumer_msg { } u; } LTTNG_PACKED; +/* + * Channel monitoring message returned to the session daemon on every + * monitor timer expiration. + */ +struct lttcomm_consumer_channel_monitor_msg { + /* Key of the sampled channel. */ + uint64_t key; + /* + * Lowest and highest usage (bytes) at the moment the sample was taken. + */ + uint64_t lowest, highest; +} LTTNG_PACKED; + /* * Status message returned to the sessiond after a received command. */ diff --git a/src/common/trigger.c b/src/common/trigger.c new file mode 100644 index 000000000..59c33669c --- /dev/null +++ b/src/common/trigger.c @@ -0,0 +1,179 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include + +LTTNG_HIDDEN +bool lttng_trigger_validate(struct lttng_trigger *trigger) +{ + bool valid; + + if (!trigger) { + valid = false; + goto end; + } + + valid = lttng_condition_validate(trigger->condition) && + lttng_action_validate(trigger->action); +end: + return valid; +} + +struct lttng_trigger *lttng_trigger_create( + struct lttng_condition *condition, + struct lttng_action *action) +{ + struct lttng_trigger *trigger = NULL; + + if (!condition || !action) { + goto end; + } + + trigger = zmalloc(sizeof(struct lttng_trigger)); + if (!trigger) { + goto end; + } + + trigger->condition = condition; + trigger->action = action; +end: + return trigger; +} + +struct lttng_condition *lttng_trigger_get_condition( + struct lttng_trigger *trigger) +{ + return trigger ? trigger->condition : NULL; +} + +extern struct lttng_action *lttng_trigger_get_action( + struct lttng_trigger *trigger) +{ + return trigger ? trigger->action : NULL; +} + +void lttng_trigger_destroy(struct lttng_trigger *trigger) +{ + if (!trigger) { + return; + } + + lttng_condition_destroy(trigger->condition); + lttng_action_destroy(trigger->action); + free(trigger); +} + +LTTNG_HIDDEN +ssize_t lttng_trigger_create_from_buffer(const char *buf, + struct lttng_trigger **trigger) +{ + ssize_t ret, offset = 0, condition_size, action_size; + struct lttng_condition *condition = NULL; + struct lttng_action *action = NULL; + struct lttng_trigger_comm *trigger_comm; + + if (!buf || !trigger) { + ret = -1; + goto end; + } + + /* lttng_trigger_comm header */ + trigger_comm = (struct lttng_trigger_comm *) buf; + offset += sizeof(*trigger_comm); + + /* struct lttng_condition */ + condition_size = lttng_condition_create_from_buffer(buf + offset, + &condition); + if (condition_size < 0) { + ret = condition_size; + goto end; + } + offset += condition_size; + + /* struct lttng_action */ + action_size = lttng_action_create_from_buffer(buf + offset, &action); + if (action_size < 0) { + ret = action_size; + goto end; + } + offset += action_size; + + /* Unexpected size of inner-elements; the buffer is corrupted. */ + if ((ssize_t) trigger_comm->length != condition_size + action_size) { + ret = -1; + goto error; + } + + *trigger = lttng_trigger_create(condition, action); + if (!*trigger) { + ret = -1; + goto error; + } + ret = offset; +end: + return ret; +error: + lttng_condition_destroy(condition); + lttng_action_destroy(action); + return ret; +} + +/* + * Returns the size of a trigger (header + condition + action). + * Both elements are stored contiguously, see their "*_comm" structure + * for the detailed format. + */ +LTTNG_HIDDEN +ssize_t lttng_trigger_serialize(struct lttng_trigger *trigger, char *buf) +{ + struct lttng_trigger_comm trigger_comm; + ssize_t action_size, condition_size, offset = 0, ret; + + if (!trigger) { + ret = -1; + goto end; + } + + offset += sizeof(trigger_comm); + condition_size = lttng_condition_serialize(trigger->condition, + buf ? (buf + offset) : NULL); + if (condition_size < 0) { + ret = -1; + goto end; + } + offset += condition_size; + + action_size = lttng_action_serialize(trigger->action, + buf ? (buf + offset) : NULL); + if (action_size < 0) { + ret = -1; + goto end; + } + offset += action_size; + + if (buf) { + trigger_comm.length = (uint32_t) (condition_size + action_size); + memcpy(buf, &trigger_comm, sizeof(trigger_comm)); + } + ret = offset; +end: + return ret; +} diff --git a/src/common/unix.c b/src/common/unix.c index 11c30781b..1d4ee9b36 100644 --- a/src/common/unix.c +++ b/src/common/unix.c @@ -226,7 +226,7 @@ ssize_t lttcomm_send_unix_sock(int sock, const void *buf, size_t len) { struct msghdr msg; struct iovec iov[1]; - ssize_t ret = -1; + ssize_t ret; memset(&msg, 0, sizeof(msg)); @@ -235,17 +235,28 @@ ssize_t lttcomm_send_unix_sock(int sock, const void *buf, size_t len) msg.msg_iov = iov; msg.msg_iovlen = 1; - ret = sendmsg(sock, &msg, 0); - if (ret < 0) { - /* - * Only warn about EPIPE when quiet mode is deactivated. - * We consider EPIPE as expected. - */ - if (errno != EPIPE || !lttng_opt_quiet) { - PERROR("sendmsg"); + while (iov[0].iov_len) { + ret = sendmsg(sock, &msg, 0); + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + /* + * Only warn about EPIPE when quiet mode is + * deactivated. + * We consider EPIPE as expected. + */ + if (errno != EPIPE || !lttng_opt_quiet) { + PERROR("sendmsg"); + } + goto end; + } } + iov[0].iov_len -= ret; + iov[0].iov_base += ret; } - + ret = len; +end: return ret; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 97f0497cb..af9ed5acd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1501,8 +1501,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_timer_switch_start(channel, attr.switch_timer_interval); attr.switch_timer_interval = 0; } else { + int monitor_start_ret; + consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval); + monitor_start_ret = consumer_timer_monitor_start( + channel, + msg.u.ask_channel.monitor_timer_interval); + if (monitor_start_ret < 0) { + ERR("Starting channel monitoring timer failed"); + goto end_channel_error; + } } health_code_update(); @@ -1525,6 +1534,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); } + if (channel->monitor_timer_enabled == 1) { + consumer_timer_monitor_stop(channel); + } goto end_channel_error; } @@ -1857,6 +1869,51 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } + case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: + { + int channel_monitor_pipe; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Successfully received the command's type. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + + ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, + 1); + if (ret != sizeof(channel_monitor_pipe)) { + ERR("Failed to receive channel monitor pipe"); + goto error_fatal; + } + + DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); + ret = consumer_timer_thread_set_channel_monitor_pipe( + channel_monitor_pipe); + if (!ret) { + int flags; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Set the pipe as non-blocking. */ + ret = fcntl(channel_monitor_pipe, F_GETFL, 0); + if (ret == -1) { + PERROR("fcntl get flags of the channel monitoring pipe"); + goto error_fatal; + } + flags = ret; + + ret = fcntl(channel_monitor_pipe, F_SETFL, + flags | O_NONBLOCK); + if (ret == -1) { + PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); + goto error_fatal; + } + DBG("Channel monitor pipe set as non-blocking"); + } else { + ret_code = LTTCOMM_CONSUMERD_ALREADY_SET; + } + goto end_msg_sessiond; + } default: break; } diff --git a/src/common/utils.c b/src/common/utils.c index 16d2f817a..4d49728c0 100644 --- a/src/common/utils.c +++ b/src/common/utils.c @@ -1017,6 +1017,59 @@ static inline unsigned int fls_u32(uint32_t x) #define HAS_FLS_U32 #endif +#if defined(__x86_64) +static inline +unsigned int fls_u64(uint64_t x) +{ + long r; + + asm("bsrq %1,%0\n\t" + "jnz 1f\n\t" + "movq $-1,%0\n\t" + "1:\n\t" + : "=r" (r) : "rm" (x)); + return r + 1; +} +#define HAS_FLS_U64 +#endif + +#ifndef HAS_FLS_U64 +static __attribute__((unused)) +unsigned int fls_u64(uint64_t x) +{ + unsigned int r = 64; + + if (!x) + return 0; + + if (!(x & 0xFFFFFFFF00000000ULL)) { + x <<= 32; + r -= 32; + } + if (!(x & 0xFFFF000000000000ULL)) { + x <<= 16; + r -= 16; + } + if (!(x & 0xFF00000000000000ULL)) { + x <<= 8; + r -= 8; + } + if (!(x & 0xF000000000000000ULL)) { + x <<= 4; + r -= 4; + } + if (!(x & 0xC000000000000000ULL)) { + x <<= 2; + r -= 2; + } + if (!(x & 0x8000000000000000ULL)) { + x <<= 1; + r -= 1; + } + return r; +} +#endif + #ifndef HAS_FLS_U32 static __attribute__((unused)) unsigned int fls_u32(uint32_t x) { @@ -1063,6 +1116,20 @@ int utils_get_count_order_u32(uint32_t x) return fls_u32(x - 1); } +/* + * Return the minimum order for which x <= (1UL << order). + * Return -1 if x is 0. + */ +LTTNG_HIDDEN +int utils_get_count_order_u64(uint64_t x) +{ + if (!x) { + return -1; + } + + return fls_u64(x - 1); +} + /** * Obtain the value of LTTNG_HOME environment variable, if exists. * Otherwise returns the value of HOME. diff --git a/src/common/utils.h b/src/common/utils.h index 7285f5c30..0daf5b98d 100644 --- a/src/common/utils.h +++ b/src/common/utils.h @@ -48,6 +48,7 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size, int *stream_fd); int utils_parse_size_suffix(char const * const str, uint64_t * const size); int utils_get_count_order_u32(uint32_t x); +int utils_get_count_order_u64(uint64_t x); char *utils_get_home_dir(void); char *utils_get_user_home_dir(uid_t uid); char *utils_get_kmod_probes_list(void); diff --git a/src/lib/lttng-ctl/Makefile.am b/src/lib/lttng-ctl/Makefile.am index 6b6a6eed1..b5156caf9 100644 --- a/src/lib/lttng-ctl/Makefile.am +++ b/src/lib/lttng-ctl/Makefile.am @@ -5,7 +5,8 @@ SUBDIRS = filter lib_LTLIBRARIES = liblttng-ctl.la liblttng_ctl_la_SOURCES = lttng-ctl.c snapshot.c lttng-ctl-helper.h \ - lttng-ctl-health.c save.c load.c deprecated-symbols.c + lttng-ctl-health.c save.c load.c deprecated-symbols.c \ + channel.c liblttng_ctl_la_LDFLAGS = \ $(LT_NO_UNDEFINED) diff --git a/src/lib/lttng-ctl/channel.c b/src/lib/lttng-ctl/channel.c new file mode 100644 index 000000000..c42829951 --- /dev/null +++ b/src/lib/lttng-ctl/channel.c @@ -0,0 +1,285 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "lttng-ctl-helper.h" + +struct lttng_notification_channel *lttng_notification_channel_create( + struct lttng_endpoint *endpoint) +{ + int fd, ret; + bool is_in_tracing_group = false, is_root = false; + char *sock_path = NULL; + struct lttng_notification_channel *channel = NULL; + + if (!endpoint || + endpoint != lttng_session_daemon_notification_endpoint) { + goto end; + } + + sock_path = zmalloc(LTTNG_PATH_MAX); + if (!sock_path) { + goto end; + } + + channel = zmalloc(sizeof(struct lttng_notification_channel)); + if (!channel) { + goto end; + } + channel->socket = -1; + + is_root = (getuid() == 0); + if (!is_root) { + is_in_tracing_group = lttng_check_tracing_group(); + } + + if (is_root || is_in_tracing_group) { + lttng_ctl_copy_string(sock_path, + DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK, + LTTNG_PATH_MAX); + ret = lttcomm_connect_unix_sock(sock_path); + if (ret >= 0) { + fd = ret; + goto set_fd; + } + } + + /* Fallback to local session daemon. */ + ret = snprintf(sock_path, LTTNG_PATH_MAX, + DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK, + utils_get_home_dir()); + if (ret < 0 || ret >= LTTNG_PATH_MAX) { + goto error; + } + + ret = lttcomm_connect_unix_sock(sock_path); + if (ret < 0) { + goto error; + } + fd = ret; + +set_fd: + channel->socket = fd; + + /* FIXME send creds */ +end: + free(sock_path); + return channel; +error: + lttng_notification_channel_destroy(channel); + channel = NULL; + goto end; +} + +enum lttng_notification_channel_status +lttng_notification_channel_get_next_notification( + struct lttng_notification_channel *channel, + struct lttng_notification **_notification) +{ + ssize_t ret; + struct lttng_notification_comm comm; + struct lttng_notification *notification = NULL; + struct lttng_dynamic_buffer reception_buffer; + struct lttng_notification_channel_message msg; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + + lttng_dynamic_buffer_init(&reception_buffer); + + if (!channel || !_notification) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + + ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg)); + if (ret <= 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + if (msg.type != LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + ret = lttcomm_recv_unix_sock(channel->socket, &comm, sizeof(comm)); + if (ret < sizeof(comm)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + ret = lttng_dynamic_buffer_set_size(&reception_buffer, + comm.length + sizeof(comm)); + if (ret) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + memcpy(reception_buffer.data, &comm, sizeof(comm)); + ret = lttcomm_recv_unix_sock(channel->socket, + reception_buffer.data + sizeof(comm), + comm.length); + if (ret < (ssize_t) comm.length) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + ret = lttng_notification_create_from_buffer(reception_buffer.data, + ¬ification); + if (ret != (sizeof(comm) + (ssize_t) comm.length)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto error; + } + *_notification = notification; +end: + lttng_dynamic_buffer_reset(&reception_buffer); + return status; +error: + lttng_notification_destroy(notification); + goto end; +} + +static +enum lttng_notification_channel_status send_command( + struct lttng_notification_channel *channel, + enum lttng_notification_channel_message_type type, + struct lttng_condition *condition) +{ + int socket; + ssize_t command_size, ret; + size_t received = 0; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + char *command_buffer = NULL; + struct lttng_notification_channel_message cmd_message = { + .type = type, + }; + struct lttng_notification_channel_message reply_message; + struct lttng_notification_channel_command_reply reply; + + if (!channel) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + + socket = channel->socket; + if (!lttng_condition_validate(condition)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + + ret = lttng_condition_serialize(condition, NULL); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + assert(ret < UINT32_MAX); + cmd_message.size = (uint32_t) ret; + command_size = ret + sizeof( + struct lttng_notification_channel_message); + command_buffer = zmalloc(command_size); + if (!command_buffer) { + goto end; + } + + memcpy(command_buffer, &cmd_message, sizeof(cmd_message)); + ret = lttng_condition_serialize(condition, + command_buffer + sizeof(cmd_message)); + if (ret < 0) { + goto end; + } + + ret = lttcomm_send_unix_sock(socket, command_buffer, command_size); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + /* Receive command reply header. */ + do { + ret = lttcomm_recv_unix_sock(socket, + ((char *) &reply_message) + received, + sizeof(reply_message) - received); + if (ret <= 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + received += ret; + } while (received < sizeof(reply_message)); + if (reply_message.type != + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY || + reply_message.size != sizeof(reply)) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + + /* Receive command reply payload. */ + received = 0; + do { + ret = lttcomm_recv_unix_sock(socket, + ((char *) &reply) + received, + sizeof(reply) - received); + if (ret <= 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end; + } + received += ret; + } while (received < sizeof(reply)); + status = (enum lttng_notification_channel_status) reply.status; +end: + free(command_buffer); + return status; +} + +enum lttng_notification_channel_status lttng_notification_channel_subscribe( + struct lttng_notification_channel *channel, + struct lttng_condition *condition) +{ + return send_command(channel, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE, + condition); +} + +enum lttng_notification_channel_status lttng_notification_channel_unsubscribe( + struct lttng_notification_channel *channel, + struct lttng_condition *condition) +{ + return send_command(channel, + LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE, + condition); +} + +void lttng_notification_channel_destroy( + struct lttng_notification_channel *channel) +{ + if (!channel) { + return; + } + + if (channel->socket >= 0) { + (void) lttcomm_close_unix_sock(channel->socket); + } + free(channel); +} + diff --git a/src/lib/lttng-ctl/lttng-ctl.c b/src/lib/lttng-ctl/lttng-ctl.c index f0b211c7b..e44986ee7 100644 --- a/src/lib/lttng-ctl/lttng-ctl.c +++ b/src/lib/lttng-ctl/lttng-ctl.c @@ -36,6 +36,8 @@ #include #include #include +#include +#include #include "filter/filter-ast.h" #include "filter/filter-parser.h" @@ -2435,6 +2437,94 @@ end: return ret; } +int lttng_register_trigger(struct lttng_trigger *trigger) +{ + int ret; + struct lttcomm_session_msg lsm; + char *trigger_buf = NULL; + ssize_t trigger_size; + + if (!trigger) { + ret = -LTTNG_ERR_INVALID; + goto end; + } + + if (!lttng_trigger_validate(trigger)) { + ret = -LTTNG_ERR_INVALID; + goto end; + } + + trigger_size = lttng_trigger_serialize(trigger, NULL); + if (trigger_size < 0) { + ret = -LTTNG_ERR_UNK; + goto end; + } + + trigger_buf = zmalloc(trigger_size); + if (!trigger_buf) { + ret = -LTTNG_ERR_NOMEM; + goto end; + } + + memset(&lsm, 0, sizeof(lsm)); + lsm.cmd_type = LTTNG_REGISTER_TRIGGER; + if (lttng_trigger_serialize(trigger, trigger_buf) < 0) { + ret = -LTTNG_ERR_UNK; + goto end; + } + + lsm.u.trigger.length = (uint32_t) trigger_size; + ret = lttng_ctl_ask_sessiond_varlen_no_cmd_header(&lsm, trigger_buf, + trigger_size, NULL); +end: + free(trigger_buf); + return ret; +} + +int lttng_unregister_trigger(struct lttng_trigger *trigger) +{ + int ret; + struct lttcomm_session_msg lsm; + char *trigger_buf = NULL; + ssize_t trigger_size; + + if (!trigger) { + ret = -LTTNG_ERR_INVALID; + goto end; + } + + if (!lttng_trigger_validate(trigger)) { + ret = -LTTNG_ERR_INVALID; + goto end; + } + + trigger_size = lttng_trigger_serialize(trigger, NULL); + if (trigger_size < 0) { + ret = -LTTNG_ERR_UNK; + goto end; + } + + trigger_buf = zmalloc(trigger_size); + if (!trigger_buf) { + ret = -LTTNG_ERR_NOMEM; + goto end; + } + + memset(&lsm, 0, sizeof(lsm)); + lsm.cmd_type = LTTNG_UNREGISTER_TRIGGER; + if (lttng_trigger_serialize(trigger, trigger_buf) < 0) { + ret = -LTTNG_ERR_UNK; + goto end; + } + + lsm.u.trigger.length = (uint32_t) trigger_size; + ret = lttng_ctl_ask_sessiond_varlen_no_cmd_header(&lsm, trigger_buf, + trigger_size, NULL); +end: + free(trigger_buf); + return ret; +} + /* * lib constructor. */ diff --git a/tests/unit/Makefile.am b/tests/unit/Makefile.am index f9dd53bef..ddae048ca 100644 --- a/tests/unit/Makefile.am +++ b/tests/unit/Makefile.am @@ -73,6 +73,7 @@ UST_DATA_TRACE=$(top_builddir)/src/bin/lttng-sessiond/trace-ust.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/session.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/snapshot.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/agent.$(OBJEXT) \ + $(top_builddir)/src/bin/lttng-sessiond/notification-thread-commands.$(OBJEXT) \ $(top_builddir)/src/common/libcommon.la \ $(top_builddir)/src/common/health/libhealth.la \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la diff --git a/tests/unit/test_ust_data.c b/tests/unit/test_ust_data.c index 7996b8e60..86c1e23bf 100644 --- a/tests/unit/test_ust_data.c +++ b/tests/unit/test_ust_data.c @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -49,8 +50,9 @@ int lttng_opt_mi; int ust_consumerd32_fd; int ust_consumerd64_fd; -/* Global variable required by sessiond objects being linked-in */ +/* Global variables required by sessiond objects being linked-in */ struct lttng_ht *agent_apps_ht_by_sock; +struct notification_thread_handle *notification_thread_handle; static const char alphanum[] = "0123456789" -- 2.34.1