From b7bce76449cfa7ca3db9fd900239eb25962349d7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 15 May 2020 16:04:11 -0400 Subject: [PATCH 01/16] Add lttng_dynamic_buffer_append_view util MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Add lttng_dynamic_buffer_append_view() which appends the contents of a buffer view to a dynamic buffer. Signed-off-by: Jérémie Galarneau Change-Id: I4082ba2c848b79aa2116847987067453638de441 --- src/common/dynamic-buffer.c | 18 ++++++++++++++++++ src/common/dynamic-buffer.h | 11 +++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/common/dynamic-buffer.c b/src/common/dynamic-buffer.c index fd39f813b..0a4683b27 100644 --- a/src/common/dynamic-buffer.c +++ b/src/common/dynamic-buffer.c @@ -16,6 +16,7 @@ */ #include +#include #include #include @@ -93,6 +94,23 @@ end: return ret; } +LTTNG_HIDDEN +int lttng_dynamic_buffer_append_view(struct lttng_dynamic_buffer *buffer, + const struct lttng_buffer_view *src) +{ + int ret; + + if (!buffer || !src) { + ret = -1; + goto end; + } + + ret = lttng_dynamic_buffer_append(buffer, src->data, + src->size); +end: + return ret; +} + LTTNG_HIDDEN int lttng_dynamic_buffer_set_size(struct lttng_dynamic_buffer *buffer, size_t new_size) diff --git a/src/common/dynamic-buffer.h b/src/common/dynamic-buffer.h index d42a6a61d..b0239b1d0 100644 --- a/src/common/dynamic-buffer.h +++ b/src/common/dynamic-buffer.h @@ -22,6 +22,8 @@ #include #include +struct lttng_buffer_view; + struct lttng_dynamic_buffer { char *data; /* size is the buffer's currently used capacity. */ @@ -58,6 +60,15 @@ LTTNG_HIDDEN int lttng_dynamic_buffer_append_buffer(struct lttng_dynamic_buffer *dst_buffer, struct lttng_dynamic_buffer *src_buffer); +/* + * Performs the same action as lttng_dynamic_buffer_append(), but using a + * buffer view as the source buffer. The source buffer's size is used in lieu + * of "len". + */ +LTTNG_HIDDEN +int lttng_dynamic_buffer_append_view(struct lttng_dynamic_buffer *buffer, + const struct lttng_buffer_view *view); + /* * Set the buffer's size to new_size. The capacity of the buffer will * be expanded (if necessary) to accomodate new_size. Areas acquired by -- 2.34.1 From 1fdb9a7834064be4e1efbefd2b77a0c6a3f2d89b Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Mon, 4 May 2020 18:21:48 -0400 Subject: [PATCH 02/16] consumerd: move address computation from on_read_subbuffer_mmap MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The computation of the subbuffer's address is moved outside of lttng_consumer_on_read_subbuffer_mmap to make it usable with a regular buffer. This facilitates an upcoming change. Moreover this has the benefit of isolating domain-specific logic from this function which is supposed to be domain-agnostic. Signed-off-by: Jérémie Galarneau Change-Id: I16f8ccaa73804f98fa03e69136548e6d6b7782e5 --- src/common/consumer/consumer.c | 38 +--------- src/common/consumer/consumer.h | 4 +- src/common/kernel-consumer/kernel-consumer.c | 44 ++++++++++- src/common/ust-consumer/ust-consumer.c | 78 ++++++++++++++------ src/common/ust-consumer/ust-consumer.h | 5 +- 5 files changed, 105 insertions(+), 64 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 0ed22ac31..739a6ab81 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1507,12 +1507,12 @@ end: */ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len, + struct lttng_consumer_stream *stream, + const char *buffer, + unsigned long len, unsigned long padding, struct ctf_packet_index *index) { - unsigned long mmap_offset; - void *mmap_base; ssize_t ret = 0; off_t orig_offset = stream->out_fd_offset; /* Default is on the disk */ @@ -1532,36 +1532,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } } - /* get the offset inside the fd to mmap */ - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - mmap_base = stream->mmap_base; - ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); - if (ret < 0) { - PERROR("tracer ctl get_mmap_read_offset"); - goto end; - } - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - mmap_base = lttng_ustctl_get_mmap_base(stream); - if (!mmap_base) { - ERR("read mmap get mmap base for stream %s", stream->name); - ret = -EPERM; - goto end; - } - ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); - if (ret != 0) { - PERROR("tracer ctl get_mmap_read_offset"); - ret = -EINVAL; - goto end; - } - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - } - /* Handle stream on the relayd if the output is on the network */ if (relayd) { unsigned long netlen = len; @@ -1659,7 +1629,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * This call guarantee that len or less is returned. It's impossible to * receive a ret value that is bigger than len. */ - ret = lttng_write(outfd, mmap_base + mmap_offset, len); + ret = lttng_write(outfd, buffer, len); DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret < 0 || ((size_t) ret != len)) { /* diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index ab59e6e75..c9d515e00 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -699,7 +699,9 @@ struct lttng_consumer_local_data *lttng_consumer_create( void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len, + struct lttng_consumer_stream *stream, + const char *buffer, + unsigned long len, unsigned long padding, struct ctf_packet_index *index); ssize_t lttng_consumer_on_read_subbuffer_splice( diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 0a232dcfc..4c454da00 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -16,6 +16,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include #define _LGPL_SOURCE #include #include @@ -109,6 +110,25 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, return ret; } +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base = stream->mmap_base; + + ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret < 0) { + PERROR("Failed to get mmap read offset"); + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; +} + /* * Take a snapshot of all the stream of a channel * @@ -238,6 +258,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; + const char *subbuf_addr; health_code_update(); @@ -267,7 +288,13 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, goto error_put_subbuf; } - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + stream, subbuf_addr, len, padded_len - len, NULL); /* * We write the padded len in local tracefiles but the data len @@ -1387,6 +1414,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } break; case CONSUMER_CHANNEL_MMAP: + { + const char *subbuf_addr; + /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) { @@ -1406,13 +1436,20 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + /* Make sure the tracer is not gone mad on us! */ assert(len >= subbuf_size); padding = len - subbuf_size; /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, + subbuf_addr, + subbuf_size, padding, &index); /* * The mmap operation should write subbuf_size amount of data when @@ -1432,11 +1469,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, write_index = 0; } break; + } default: ERR("Unknown output method"); ret = -EPERM; } - +error_put_subbuf: err = kernctl_put_next_subbuf(infd); if (err != 0) { if (err == -EFAULT) { diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 6009ef1f8..4e93faf64 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -16,6 +16,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include #define _LGPL_SOURCE #include #include @@ -1088,6 +1089,35 @@ error: return ret; } +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base; + + mmap_base = ustctl_get_mmap_base(stream->ustream); + if (!mmap_base) { + ERR("Failed to get mmap base for stream `%s`", + stream->name); + ret = -EPERM; + goto error; + } + + ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset); + if (ret != 0) { + ERR("Failed to get mmap offset for stream `%s`", stream->name); + ret = -EINVAL; + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; + +} + /* * Take a snapshot of all the stream of a channel. * @@ -1192,6 +1222,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; + const char *subbuf_addr; health_code_update(); @@ -1221,7 +1252,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, goto error_put_subbuf; } - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + stream, subbuf_addr, len, padded_len - len, NULL); if (use_relayd) { if (read_len != len) { @@ -1935,29 +1972,13 @@ error_fatal: return -1; } -/* - * Wrapper over the mmap() read offset from ust-ctl library. Since this can be - * compiled out, we isolate it in this library. - */ -int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream, - unsigned long *off) +void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active) { assert(stream); assert(stream->ustream); - return ustctl_get_mmap_read_offset(stream->ustream, off); -} - -/* - * Wrapper over the mmap() read offset from ust-ctl library. Since this can be - * compiled out, we isolate it in this library. - */ -void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream) -{ - assert(stream); - assert(stream->ustream); - - return ustctl_get_mmap_base(stream->ustream); + ustctl_flush_buffer(stream->ustream, producer_active); } /* @@ -2453,6 +2474,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; + const char *subbuf_addr; assert(stream); assert(stream->ustream); @@ -2548,11 +2570,20 @@ retry: assert(len >= subbuf_size); padding = len - subbuf_size; + + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + write_index = 0; + goto error_put_subbuf; + } + /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); + ret = lttng_consumer_on_read_subbuffer_mmap( + ctx, stream, subbuf_addr, subbuf_size, padding, &index); /* - * The mmap operation should write subbuf_size amount of data when network - * streaming or the full padding (len) size when we are _not_ streaming. + * The mmap operation should write subbuf_size amount of data when + * network streaming or the full padding (len) size when we are _not_ + * streaming. */ if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { @@ -2569,6 +2600,7 @@ retry: ret, len, subbuf_size); write_index = 0; } +error_put_subbuf: err = ustctl_put_next_subbuf(ustream); assert(err == 0); diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index 67b5bb511..966ba3c86 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -47,9 +47,8 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream); void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream); -int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream, - unsigned long *off); -void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream); +void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active); int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id); int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream); -- 2.34.1 From ace0e591600a76d76d54edef87ffb24afff6d209 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Mon, 4 May 2020 19:04:02 -0400 Subject: [PATCH 03/16] consumerd: cleanup: use buffer view interface for mmap read subbuf MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Replace explicit pointer + size parameters by an lttng_buffer_view in lttng_consumer_on_read_subbuffer_mmap(). Signed-off-by: Jérémie Galarneau Change-Id: I76f35b3e295c596cdf4bbb8a6d01168a850a721a --- src/common/consumer/consumer.c | 32 +++++++++++--------- src/common/consumer/consumer.h | 4 +-- src/common/kernel-consumer/kernel-consumer.c | 22 ++++++++------ src/common/ust-consumer/ust-consumer.c | 12 ++++++-- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 739a6ab81..f06ea9d4d 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1508,8 +1508,7 @@ end: ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, - const char *buffer, - unsigned long len, + const struct lttng_buffer_view *buffer, unsigned long padding, struct ctf_packet_index *index) { @@ -1519,6 +1518,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( int outfd = stream->out_fd; struct consumer_relayd_sock_pair *relayd = NULL; unsigned int relayd_hang_up = 0; + const size_t subbuf_content_size = buffer->size - padding; + size_t write_len; /* RCU lock for the relayd pointer */ rcu_read_lock(); @@ -1534,7 +1535,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* Handle stream on the relayd if the output is on the network */ if (relayd) { - unsigned long netlen = len; + unsigned long netlen = subbuf_content_size; /* * Lock the control socket for the complete duration of the function @@ -1572,10 +1573,10 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( goto write_error; } } - } else { - /* No streaming, we have to set the len with the full padding */ - len += padding; + write_len = subbuf_content_size; + } else { + /* No streaming; we have to write the full padding. */ if (stream->metadata_flag && stream->reset_metadata_flag) { ret = utils_truncate_stream_file(stream->out_fd, 0); if (ret < 0) { @@ -1589,7 +1590,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * Check if we need to change the tracefile before writing the packet. */ if (stream->chan->tracefile_size > 0 && - (stream->tracefile_size_current + len) > + (stream->tracefile_size_current + buffer->size) > stream->chan->tracefile_size) { ret = utils_rotate_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, @@ -1619,19 +1620,21 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( stream->out_fd_offset = 0; orig_offset = 0; } - stream->tracefile_size_current += len; + stream->tracefile_size_current += buffer->size; if (index) { index->offset = htobe64(stream->out_fd_offset); } + + write_len = buffer->size; } /* * This call guarantee that len or less is returned. It's impossible to * receive a ret value that is bigger than len. */ - ret = lttng_write(outfd, buffer, len); - DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); - if (ret < 0 || ((size_t) ret != len)) { + ret = lttng_write(outfd, buffer->data, write_len); + DBG("Consumer mmap write() ret %zd (len %lu)", ret, write_len); + if (ret < 0 || ((size_t) ret != write_len)) { /* * Report error to caller if nothing was written else at least send the * amount written. @@ -1652,7 +1655,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( DBG("Consumer mmap write detected relayd hang up"); } else { /* Unhandled error, print it and stop function right now. */ - PERROR("Error in write mmap (ret %zd != len %lu)", ret, len); + PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, + write_len); } goto write_error; } @@ -1661,9 +1665,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, len, + lttng_sync_file_range(outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += len; + stream->out_fd_offset += write_len; lttng_consumer_sync_trace_file(stream, orig_offset); } diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index c9d515e00..aebcf4bac 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -33,6 +33,7 @@ #include #include #include +#include /* Commands for consumer */ enum lttng_consumer_command { @@ -700,8 +701,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, - const char *buffer, - unsigned long len, + const struct lttng_buffer_view *buffer, unsigned long padding, struct ctf_packet_index *index); ssize_t lttng_consumer_on_read_subbuffer_splice( diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 4c454da00..49b5070f7 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -16,6 +16,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include "common/buffer-view.h" #include #define _LGPL_SOURCE #include @@ -259,9 +260,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ssize_t read_len; unsigned long len, padded_len; const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; health_code_update(); - DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); @@ -293,8 +294,10 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, goto error_put_subbuf; } + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, - stream, subbuf_addr, len, + stream, &subbuf_view, padded_len - len, NULL); /* * We write the padded len in local tracefiles but the data len @@ -1416,6 +1419,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, case CONSUMER_CHANNEL_MMAP: { const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); @@ -1446,15 +1450,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, padding = len - subbuf_size; + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); + /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, - subbuf_addr, - subbuf_size, - padding, &index); + ret = lttng_consumer_on_read_subbuffer_mmap( + ctx, stream, &subbuf_view, padding, &index); /* - * The mmap operation should write subbuf_size amount of data when - * network streaming or the full padding (len) size when we are _not_ - * streaming. + * The mmap operation should write subbuf_size amount of data + * when network streaming or the full padding (len) size when we + * are _not_ streaming. */ if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 4e93faf64..0318eecea 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1223,6 +1223,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, ssize_t read_len; unsigned long len, padded_len; const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; health_code_update(); @@ -1257,9 +1258,11 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, goto error_put_subbuf; } + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, - stream, subbuf_addr, len, - padded_len - len, NULL); + stream, &subbuf_view, padded_len - len, + NULL); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -2475,6 +2478,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; assert(stream); assert(stream->ustream); @@ -2577,9 +2581,11 @@ retry: goto error_put_subbuf; } + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); + /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, subbuf_addr, subbuf_size, padding, &index); + ctx, stream, &subbuf_view, padding, &index); /* * The mmap operation should write subbuf_size amount of data when * network streaming or the full padding (len) size when we are _not_ -- 2.34.1 From 59db0d42017ebea7422e31b3f8b137ba835e5e8d Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Tue, 5 May 2020 13:13:03 -0400 Subject: [PATCH 04/16] consumerd: pass channel instance to stream creation function MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Both callsites of consumer_allocate_stream() set the stream's "chan" pointer after the creation. Pass the channel directly to the stream creation function so it can initialize the stream according to the channel's settings. Signed-off-by: Jérémie Galarneau Change-Id: Icea7088e7695e310585bf398e14e6443d67a30bb --- src/common/consumer/consumer.c | 6 ++++-- src/common/consumer/consumer.h | 4 +++- src/common/kernel-consumer/kernel-consumer.c | 11 +++++++++-- src/common/ust-consumer/ust-consumer.c | 6 +++--- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index f06ea9d4d..e1ee5f2fc 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -532,7 +532,9 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) consumer_stream_destroy(stream, metadata_ht); } -struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, +struct lttng_consumer_stream *consumer_allocate_stream( + struct lttng_consumer_channel *channel, + uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, const char *channel_name, @@ -556,7 +558,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, } rcu_read_lock(); - + stream->chan = channel; stream->key = stream_key; stream->out_fd = -1; stream->out_fd_offset = 0; diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index aebcf4bac..9ba1dacbc 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -645,7 +645,9 @@ void lttng_consumer_cleanup(void); */ int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); -struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, +struct lttng_consumer_stream *consumer_allocate_stream( + struct lttng_consumer_channel *channel, + uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, const char *channel_name, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 49b5070f7..29b75d68d 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -650,7 +650,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - new_stream = consumer_allocate_stream(channel->key, + pthread_mutex_lock(&channel->lock); + new_stream = consumer_allocate_stream( + channel, + channel->key, fd, LTTNG_CONSUMER_ACTIVE_STREAM, channel->name, @@ -670,10 +673,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); break; } + pthread_mutex_unlock(&channel->lock); goto end_nosignal; } - new_stream->chan = channel; new_stream->wait_fd = fd; switch (channel->output) { case CONSUMER_CHANNEL_SPLICE: @@ -733,6 +736,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, "relayd id %" PRIu64, new_stream->name, new_stream->relayd_id); cds_list_add(&new_stream->send_node, &channel->streams.head); + pthread_mutex_unlock(&channel->lock); break; } @@ -741,6 +745,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_relayd_stream(new_stream, new_stream->chan->pathname); if (ret < 0) { + pthread_mutex_unlock(&channel->lock); consumer_stream_free(new_stream); goto end_nosignal; } @@ -754,10 +759,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_relayd_streams_sent( new_stream->relayd_id); if (ret < 0) { + pthread_mutex_unlock(&channel->lock); goto end_nosignal; } } } + pthread_mutex_unlock(&channel->lock); /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0318eecea..c37be8fb8 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -155,7 +155,9 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, assert(channel); assert(ctx); - stream = consumer_allocate_stream(channel->key, + stream = consumer_allocate_stream( + channel, + channel->key, key, LTTNG_CONSUMER_ACTIVE_STREAM, channel->name, @@ -185,8 +187,6 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, goto error; } - stream->chan = channel; - error: if (_alloc_ret) { *_alloc_ret = alloc_ret; -- 2.34.1 From 11785f65c9582c7e8cda10fa9ac8da2d7911db01 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Tue, 5 May 2020 15:48:05 -0400 Subject: [PATCH 05/16] consumerd: tag metadata channel as being part of a live session MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit metadata channels that are part of a live session must be handled differently than when they are part of non-live sessions since complete "metadata units" must be accumulated before they are forwarded to a relay daemon. This allows a follow-up fix to use this information since the live_timer_interval of a metadata channel is always 0. Signed-off-by: Jérémie Galarneau Change-Id: I53db4bc717b149ed20e0309531db6f0241e873e1 --- src/bin/lttng-sessiond/consumer.c | 6 +++- src/bin/lttng-sessiond/consumer.h | 4 ++- src/bin/lttng-sessiond/kernel-consumer.c | 5 +-- src/bin/lttng-sessiond/trace-kernel.h | 2 +- src/bin/lttng-sessiond/ust-consumer.c | 1 + src/common/consumer/consumer.c | 2 ++ src/common/consumer/consumer.h | 3 ++ src/common/kernel-consumer/kernel-consumer.c | 1 + src/common/sessiond-comm/sessiond-comm.h | 3 ++ src/common/ust-consumer/ust-consumer.c | 33 ++++++-------------- 10 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 960fe8e46..717dcdbf3 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -814,6 +814,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + bool is_in_live_session, int output, int type, uint64_t session_id, @@ -845,6 +846,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; msg->u.ask_channel.live_timer_interval = live_timer_interval; + msg->u.ask_channel.is_live = is_in_live_session; msg->u.ask_channel.output = output; msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; @@ -900,7 +902,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_size, uint64_t tracefile_count, unsigned int monitor, - unsigned int live_timer_interval) + unsigned int live_timer_interval, + bool is_in_live_session) { assert(msg); @@ -921,6 +924,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.tracefile_count = tracefile_count; msg->u.channel.monitor = monitor; msg->u.channel.live_timer_interval = live_timer_interval; + msg->u.channel.is_live = is_in_live_session; strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname)); diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 08b57eb73..42e4ec892 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -232,6 +232,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + bool is_in_live_session, int output, int type, uint64_t session_id, @@ -273,7 +274,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_size, uint64_t tracefile_count, unsigned int monitor, - unsigned int live_timer_interval); + unsigned int live_timer_interval, + bool is_in_live_session); int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer); int consumer_close_metadata(struct consumer_socket *socket, diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index d21b1868a..a086384ac 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -132,7 +132,8 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, channel->channel->attr.tracefile_size, channel->channel->attr.tracefile_count, monitor, - channel->channel->attr.live_timer_interval); + channel->channel->attr.live_timer_interval, + session->is_live_session); health_code_update(); @@ -196,7 +197,7 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, DEFAULT_KERNEL_CHANNEL_OUTPUT, CONSUMER_CHANNEL_TYPE_METADATA, 0, 0, - monitor, 0); + monitor, 0, session->is_live_session); health_code_update(); diff --git a/src/bin/lttng-sessiond/trace-kernel.h b/src/bin/lttng-sessiond/trace-kernel.h index fbaa5039c..7cb5c43f4 100644 --- a/src/bin/lttng-sessiond/trace-kernel.h +++ b/src/bin/lttng-sessiond/trace-kernel.h @@ -117,13 +117,13 @@ struct ltt_kernel_session { unsigned int output_traces; unsigned int snapshot_mode; unsigned int has_non_default_channel; - struct lttng_tracker_list *tracker_list_pid; struct lttng_tracker_list *tracker_list_vpid; struct lttng_tracker_list *tracker_list_uid; struct lttng_tracker_list *tracker_list_vuid; struct lttng_tracker_list *tracker_list_gid; struct lttng_tracker_list *tracker_list_vgid; + bool is_live_session; }; /* diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 11318dbf1..8bf342293 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -174,6 +174,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, ua_chan->attr.switch_timer_interval, ua_chan->attr.read_timer_interval, ua_sess->live_timer_interval, + ua_sess->live_timer_interval != 0, output, (int) ua_chan->attr.type, ua_sess->tracing_id, diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index e1ee5f2fc..8eebce334 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -952,6 +952,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id_per_pid, unsigned int monitor, unsigned int live_timer_interval, + bool is_in_live_session, const char *root_shm_path, const char *shm_path) { @@ -974,6 +975,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_count = tracefile_count; channel->monitor = monitor; channel->live_timer_interval = live_timer_interval; + channel->is_live = is_in_live_session; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 9ba1dacbc..23166175f 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -170,6 +170,8 @@ struct lttng_consumer_channel { int live_timer_enabled; timer_t live_timer; int live_timer_error; + /* Channel is part of a live session ? */ + bool is_live; /* On-disk circular buffer */ uint64_t tracefile_size; @@ -672,6 +674,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id_per_pid, unsigned int monitor, unsigned int live_timer_interval, + bool is_in_live_session, const char *root_shm_path, const char *shm_path); void consumer_del_stream(struct lttng_consumer_stream *stream, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 29b75d68d..f6a5ba7f5 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -520,6 +520,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.tracefile_count, 0, msg.u.channel.monitor, msg.u.channel.live_timer_interval, + msg.u.channel.is_live, NULL, NULL); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 608e83e95..06af0b48c 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -452,6 +452,8 @@ struct lttcomm_consumer_msg { uint32_t monitor; /* timer to check the streams usage in live mode (usec). */ unsigned int live_timer_interval; + /* is part of a live session */ + uint8_t is_live; } LTTNG_PACKED channel; /* Only used by Kernel. */ struct { uint64_t stream_key; @@ -483,6 +485,7 @@ struct lttcomm_consumer_msg { uint32_t switch_timer_interval; /* usec */ uint32_t read_timer_interval; /* usec */ unsigned int live_timer_interval; /* usec */ + uint8_t is_live; /* is part of a live session */ int32_t output; /* splice, mmap */ int32_t type; /* metadata or per_cpu */ uint64_t session_id; /* Tracing session id */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index c37be8fb8..225f80e90 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -119,26 +119,6 @@ error: return ret; } -/* - * Allocate and return a consumer channel object. - */ -static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, - const char *pathname, const char *name, uid_t uid, gid_t gid, - uint64_t relayd_id, uint64_t key, enum lttng_event_output output, - uint64_t tracefile_size, uint64_t tracefile_count, - uint64_t session_id_per_pid, unsigned int monitor, - unsigned int live_timer_interval, - const char *root_shm_path, const char *shm_path) -{ - assert(pathname); - assert(name); - - return consumer_allocate_channel(key, session_id, pathname, name, uid, - gid, relayd_id, output, tracefile_size, - tracefile_count, session_id_per_pid, monitor, - live_timer_interval, root_shm_path, shm_path); -} - /* * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the * error value if applicable is set in it else it is kept untouched. @@ -1487,16 +1467,21 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct ustctl_consumer_channel_attr attr; /* Create a plain object and reserve a channel key. */ - channel = allocate_channel(msg.u.ask_channel.session_id, - msg.u.ask_channel.pathname, msg.u.ask_channel.name, - msg.u.ask_channel.uid, msg.u.ask_channel.gid, - msg.u.ask_channel.relayd_id, msg.u.ask_channel.key, + channel = consumer_allocate_channel( + msg.u.ask_channel.key, + msg.u.ask_channel.session_id, + msg.u.ask_channel.pathname, + msg.u.ask_channel.name, + msg.u.ask_channel.uid, + msg.u.ask_channel.gid, + msg.u.ask_channel.relayd_id, (enum lttng_event_output) msg.u.ask_channel.output, msg.u.ask_channel.tracefile_size, msg.u.ask_channel.tracefile_count, msg.u.ask_channel.session_id_per_pid, msg.u.ask_channel.monitor, msg.u.ask_channel.live_timer_interval, + msg.u.ask_channel.is_live, msg.u.ask_channel.root_shm_path, msg.u.ask_channel.shm_path); if (!channel) { -- 2.34.1 From 038abe40887e7e9021c1369b250d6f647936cd73 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Tue, 5 May 2020 18:54:32 -0400 Subject: [PATCH 06/16] sessiond: enforce mmap output type for kernel metadata channel MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit A follow-up fix causes the consumer daemon to accumulate metadata packets into a complete "unit" that can be parsed before sending it to the relay daemon. The consumer daemon will also need to extract the contents of the metadata cache when computing a rotation position (follow-up fix too). Hence, it is not possible to rely on the splice back-end as the consumer daemon may need to accumulate more content than can be backed by the ring buffer's underlying pages. In both cases, the splice output mode could still be used when combined with a memfd, but I see no tangible benefit. Moreover, it would require a 3.17 kernel. Curiously the kernel metadata channel configuration appears to be hard-coded twice; once in the ltt_kernel_session's ltt_kernel_metadata, and once again in kernel_consumer_add_metadata(). kernel_consumer_add_metadata is modified to use the kernel session's metadata configuration. Signed-off-by: Jérémie Galarneau Change-Id: Ia4cad82f595d3eee50d081851c234d4c2ef7ee5f --- src/bin/lttng-sessiond/kernel-consumer.c | 11 +++++--- src/bin/lttng-sessiond/trace-kernel.c | 34 +++++++++++++++++++++--- src/common/defaults.h | 7 ++--- tests/unit/test_kernel_data.c | 8 +++--- 4 files changed, 45 insertions(+), 15 deletions(-) diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index a086384ac..17bf47d20 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -192,12 +192,15 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, session->uid, session->gid, consumer->net_seq_index, - DEFAULT_METADATA_NAME, + session->metadata->conf->name, 1, - DEFAULT_KERNEL_CHANNEL_OUTPUT, + session->metadata->conf->attr.output, CONSUMER_CHANNEL_TYPE_METADATA, - 0, 0, - monitor, 0, session->is_live_session); + session->metadata->conf->attr.tracefile_size, + session->metadata->conf->attr.tracefile_count, + monitor, + session->metadata->conf->attr.live_timer_interval, + session->is_live_session); health_code_update(); diff --git a/src/bin/lttng-sessiond/trace-kernel.c b/src/bin/lttng-sessiond/trace-kernel.c index d1f826883..d1c74a1ad 100644 --- a/src/bin/lttng-sessiond/trace-kernel.c +++ b/src/bin/lttng-sessiond/trace-kernel.c @@ -385,6 +385,7 @@ error: */ struct ltt_kernel_metadata *trace_kernel_create_metadata(void) { + int ret; struct ltt_kernel_metadata *lkm; struct lttng_channel *chan; @@ -395,13 +396,38 @@ struct ltt_kernel_metadata *trace_kernel_create_metadata(void) goto error; } + ret = lttng_strncpy( + chan->name, DEFAULT_METADATA_NAME, sizeof(chan->name)); + if (ret) { + ERR("Failed to initialize metadata channel name to `%s`", + DEFAULT_METADATA_NAME); + goto error; + } + /* Set default attributes */ - chan->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE; + chan->attr.overwrite = DEFAULT_METADATA_OVERWRITE; chan->attr.subbuf_size = default_get_metadata_subbuf_size(); chan->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM; - chan->attr.switch_timer_interval = DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER; - chan->attr.read_timer_interval = DEFAULT_KERNEL_CHANNEL_READ_TIMER; - chan->attr.output = DEFAULT_KERNEL_CHANNEL_OUTPUT; + chan->attr.switch_timer_interval = DEFAULT_METADATA_SWITCH_TIMER; + chan->attr.read_timer_interval = DEFAULT_METADATA_READ_TIMER;; + + + /* + * The metadata channel of kernel sessions must use the "mmap" + * back-end since the consumer daemon accumulates complete + * metadata units before sending them to the relay daemon in + * live mode. The consumer daemon also needs to extract the contents + * of the metadata cache when computing a rotation position. + * + * In both cases, it is not possible to rely on the splice + * back-end as the consumer daemon may need to accumulate more + * content than can be backed by the ring buffer's underlying + * pages. + */ + chan->attr.output = LTTNG_EVENT_MMAP; + chan->attr.tracefile_size = 0; + chan->attr.tracefile_count = 0; + chan->attr.live_timer_interval = 0; /* Init metadata */ lkm->fd = -1; diff --git a/src/common/defaults.h b/src/common/defaults.h index f504f4d78..08dea0213 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -215,9 +215,10 @@ #define DEFAULT_METADATA_SUBBUF_SIZE CONFIG_DEFAULT_METADATA_SUBBUF_SIZE #define DEFAULT_METADATA_SUBBUF_NUM CONFIG_DEFAULT_METADATA_SUBBUF_NUM #define DEFAULT_METADATA_CACHE_SIZE CONFIG_DEFAULT_METADATA_CACHE_SIZE -#define DEFAULT_METADATA_SWITCH_TIMER CONFIG_DEFAULT_METADATA_SWITCH_TIMER -#define DEFAULT_METADATA_READ_TIMER CONFIG_DEFAULT_METADATA_READ_TIMER -#define DEFAULT_METADATA_OUTPUT _DEFAULT_CHANNEL_OUTPUT +#define DEFAULT_METADATA_SWITCH_TIMER 0 +#define DEFAULT_METADATA_READ_TIMER 0 +#define DEFAULT_METADATA_OVERWRITE 0 +#define DEFAULT_METADATA_OUTPUT LTTNG_EVENT_MMAP /* Kernel has different defaults */ diff --git a/tests/unit/test_kernel_data.c b/tests/unit/test_kernel_data.c index 6a35ea54a..857005923 100644 --- a/tests/unit/test_kernel_data.c +++ b/tests/unit/test_kernel_data.c @@ -95,17 +95,17 @@ static void test_create_kernel_metadata(void) ok(kern->metadata->fd == -1 && kern->metadata->conf != NULL && kern->metadata->conf->attr.overwrite - == DEFAULT_CHANNEL_OVERWRITE && + == DEFAULT_METADATA_OVERWRITE && kern->metadata->conf->attr.subbuf_size == default_get_metadata_subbuf_size() && kern->metadata->conf->attr.num_subbuf == DEFAULT_METADATA_SUBBUF_NUM && kern->metadata->conf->attr.switch_timer_interval - == DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER && + == DEFAULT_METADATA_SWITCH_TIMER && kern->metadata->conf->attr.read_timer_interval - == DEFAULT_KERNEL_CHANNEL_READ_TIMER && + == DEFAULT_METADATA_READ_TIMER && kern->metadata->conf->attr.output - == DEFAULT_KERNEL_CHANNEL_OUTPUT, + == LTTNG_EVENT_MMAP, "Validate kernel session metadata"); trace_kernel_destroy_metadata(kern->metadata); -- 2.34.1 From f6dff2c3bdbcb392d629ee851eac70b054536fbd Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 22 May 2020 22:04:31 +0000 Subject: [PATCH 07/16] Backport LTTNG_OPTIONAL util MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit A follow-up fix makes use of the LTTNG_OPTIONAL utility. optional.h is identical to the version found in 3e778ab02. Change-Id: I98a3efefe1ab6193e0d3c9c16d00f797551f36d9 Signed-off-by: Jérémie Galarneau --- src/common/Makefile.am | 3 +- src/common/optional.h | 91 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 src/common/optional.h diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 0feb0a0d1..e3feaa38f 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -72,7 +72,8 @@ libcommon_la_SOURCES = error.h error.c utils.c utils.h runas.c runas.h \ dynamic-buffer.h dynamic-buffer.c \ buffer-view.h buffer-view.c \ unix.c unix.h \ - filter.c filter.h context.c context.h + filter.c filter.h context.c context.h \ + optional.h libcommon_la_LIBADD = \ $(top_builddir)/src/common/config/libconfig.la diff --git a/src/common/optional.h b/src/common/optional.h new file mode 100644 index 000000000..05f6054da --- /dev/null +++ b/src/common/optional.h @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2019 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#ifndef LTTNG_OPTIONAL_H +#define LTTNG_OPTIONAL_H + +#include +#include + +/* + * Define wrapper structure representing an optional value. + * + * This macro defines an "is_set" boolean field that must be checked + * when accessing the optional field. This "is_set" field provides + * the semantics that would be expected of a typical "raw pointer" field + * which would be checked for NULL. + * + * Prefer using this macro where "special" values would be used, e.g. + * -1ULL for uint64_t types. + * + * Declaration example: + * struct my_struct { + * int a; + * LTTNG_OPTIONAL(int, b); + * }; + * + * Usage example: + * struct my_struct foo = LTTNG_OPTIONAL_INIT; + * + * LTTNG_OPTIONAL_SET(&foo.b, 42); + * if (foo.b.is_set) { + * printf("%d", foo.b.value); + * } + * + * LTTNG_OPTIONAL_UNSET(&foo.b); + */ +#define LTTNG_OPTIONAL(type) \ + struct { \ + uint8_t is_set; \ + type value; \ + } + +/* + * Alias used for communication structures. If the layout of an LTTNG_OPTIONAL + * is changed, the original layout should still be used for communication + * purposes. + * + * LTTNG_OPTIONAL_COMM should be combined with the LTTNG_PACKED macro when + * used for IPC / network communication. + */ +#define LTTNG_OPTIONAL_COMM LTTNG_OPTIONAL + +/* + * This macro is available as a 'convenience' to allow sites that assume + * an optional value is set to assert() that it is set when accessing it. + * + * Since this returns the 'optional' by value, it is not suitable for all + * wrapped optional types. It is meant to be used with PODs. + */ +#define LTTNG_OPTIONAL_GET(optional) \ + ({ \ + assert((optional).is_set); \ + (optional).value; \ + }) + +/* + * Initialize an optional field. + * + * The wrapped field is set to the value it would gave if it had static storage + * duration. + */ +#define LTTNG_OPTIONAL_INIT { .is_set = 0 } + +/* Set the value of an optional field. */ +#define LTTNG_OPTIONAL_SET(field_ptr, val) \ + do { \ + (field_ptr)->value = (val); \ + (field_ptr)->is_set = 1; \ + } while (0) + +/* Put an optional field in the "unset" (NULL-ed) state. */ +#define LTTNG_OPTIONAL_UNSET(field_ptr) \ + do { \ + (field_ptr)->is_set = 0; \ + } while (0) + +#endif /* LTTNG_OPTIONAL_H */ -- 2.34.1 From 29d1a7ae5d5b46002e5883366a7b06f1d9dcc115 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Sun, 10 May 2020 18:00:26 -0400 Subject: [PATCH 08/16] consumerd: refactor: split read_subbuf into sub-operations MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The read_subbuf code paths intertwine domain-specific operations and metadata/data-specific logic which makes modifications error prone and introduces a fair amount of code duplication. lttng_consumer_read_subbuffer is effectively turned into a template method invoking overridable callbacks making most of the consumption logic domain and data/metadata agnostic. The goal is not to extensively clean-up that code path. A follow-up fix introduces metadata buffering logic which would not reasonably fit in the current scheme. This clean-up makes it easier to safely introduce those changes. No changes in behaviour are intended by this change. Signed-off-by: Jérémie Galarneau Change-Id: I9366f2e2a38018ca9b617b93ad9259340180c55d --- src/common/consumer/consumer-stream.c | 661 ++++++++++++++----- src/common/consumer/consumer-stream.h | 20 + src/common/consumer/consumer.c | 204 +++--- src/common/consumer/consumer.h | 172 ++++- src/common/kernel-consumer/kernel-consumer.c | 546 +++++---------- src/common/kernel-consumer/kernel-consumer.h | 2 - src/common/ust-consumer/ust-consumer.c | 520 ++++++--------- 7 files changed, 1157 insertions(+), 968 deletions(-) diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 97ba627dc..a705f1b18 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include "consumer-stream.h" @@ -46,6 +48,504 @@ static void free_stream_rcu(struct rcu_head *head) free(stream); } +static void consumer_stream_data_lock_all(struct lttng_consumer_stream *stream) +{ + pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->lock); +} + +static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream) +{ + pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); +} + +static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream) +{ + consumer_stream_data_lock_all(stream); + pthread_mutex_lock(&stream->metadata_rdv_lock); +} + +static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *stream) +{ + pthread_mutex_unlock(&stream->metadata_rdv_lock); + consumer_stream_data_unlock_all(stream); +} + +/* Only used for data streams. */ +static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuf) +{ + int ret = 0; + uint64_t sequence_number; + const uint64_t discarded_events = + LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number); + + if (!subbuf->info.data.sequence_number.is_set) { + /* Command not supported by the tracer. */ + sequence_number = -1ULL; + } else { + sequence_number = subbuf->info.data.sequence_number.value; + } + + /* + * Start the sequence when we extract the first packet in case we don't + * start at 0 (for example if a consumer is not connected to the + * session immediately after the beginning). + */ + if (stream->last_sequence_number == -1ULL) { + stream->last_sequence_number = sequence_number; + } else if (sequence_number > stream->last_sequence_number) { + stream->chan->lost_packets += sequence_number - + stream->last_sequence_number - 1; + } else { + /* seq <= last_sequence_number */ + ERR("Sequence number inconsistent : prev = %" PRIu64 + ", current = %" PRIu64, + stream->last_sequence_number, sequence_number); + ret = -1; + goto end; + } + stream->last_sequence_number = sequence_number; + + if (discarded_events < stream->last_discarded_events) { + /* + * Overflow has occurred. We assume only one wrap-around + * has occurred. + */ + stream->chan->discarded_events += + (1ULL << (CAA_BITS_PER_LONG - 1)) - + stream->last_discarded_events + + discarded_events; + } else { + stream->chan->discarded_events += discarded_events - + stream->last_discarded_events; + } + stream->last_discarded_events = discarded_events; + ret = 0; + +end: + return ret; +} + +static +void ctf_packet_index_populate(struct ctf_packet_index *index, + off_t offset, const struct stream_subbuffer *subbuffer) +{ + *index = (typeof(*index)){ + .offset = htobe64(offset), + .packet_size = htobe64(subbuffer->info.data.packet_size), + .content_size = htobe64(subbuffer->info.data.content_size), + .timestamp_begin = htobe64( + subbuffer->info.data.timestamp_begin), + .timestamp_end = htobe64( + subbuffer->info.data.timestamp_end), + .events_discarded = htobe64( + subbuffer->info.data.events_discarded), + .stream_id = htobe64(subbuffer->info.data.stream_id), + .stream_instance_id = htobe64( + subbuffer->info.data.stream_instance_id.is_set ? + subbuffer->info.data.stream_instance_id.value : -1ULL), + .packet_seq_num = htobe64( + subbuffer->info.data.sequence_number.is_set ? + subbuffer->info.data.sequence_number.value : -1ULL), + }; +} + +static ssize_t consumer_stream_consume_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + const unsigned long padding_size = + subbuffer->info.data.padded_subbuf_size - + subbuffer->info.data.subbuf_size; + + return lttng_consumer_on_read_subbuffer_mmap( + ctx, stream, &subbuffer->buffer.buffer, padding_size); +} + +static ssize_t consumer_stream_consume_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + return lttng_consumer_on_read_subbuffer_splice(ctx, stream, + subbuffer->info.data.padded_subbuf_size, 0); +} + +static int consumer_stream_send_index( + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx) +{ + off_t packet_offset = 0; + struct ctf_packet_index index = {}; + + /* + * This is called after consuming the sub-buffer; substract the + * effect this sub-buffer from the offset. + */ + if (stream->relayd_id == (uint64_t) -1ULL) { + packet_offset = stream->out_fd_offset - + subbuffer->info.data.padded_subbuf_size; + } + + ctf_packet_index_populate(&index, packet_offset, subbuffer); + return consumer_stream_write_index(stream, &index); +} + +/* + * Actually do the metadata sync using the given metadata stream. + * + * Return 0 on success else a negative value. ENODATA can be returned also + * indicating that there is no metadata available for that stream. + */ +static int do_sync_metadata(struct lttng_consumer_stream *metadata, + struct lttng_consumer_local_data *ctx) +{ + int ret; + + assert(metadata); + assert(metadata->metadata_flag); + assert(ctx); + + /* + * In UST, since we have to write the metadata from the cache packet + * by packet, we might need to start this procedure multiple times + * until all the metadata from the cache has been extracted. + */ + do { + /* + * Steps : + * - Lock the metadata stream + * - Check if metadata stream node was deleted before locking. + * - if yes, release and return success + * - Check if new metadata is ready (flush + snapshot pos) + * - If nothing : release and return. + * - Lock the metadata_rdv_lock + * - Unlock the metadata stream + * - cond_wait on metadata_rdv to wait the wakeup from the + * metadata thread + * - Unlock the metadata_rdv_lock + */ + pthread_mutex_lock(&metadata->lock); + + /* + * There is a possibility that we were able to acquire a reference on the + * stream from the RCU hash table but between then and now, the node might + * have been deleted just before the lock is acquired. Thus, after locking, + * we make sure the metadata node has not been deleted which means that the + * buffers are closed. + * + * In that case, there is no need to sync the metadata hence returning a + * success return code. + */ + ret = cds_lfht_is_node_deleted(&metadata->node.node); + if (ret) { + ret = 0; + goto end_unlock_mutex; + } + + switch (ctx->type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Empty the metadata cache and flush the current stream. + */ + ret = lttng_kconsumer_sync_metadata(metadata); + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Ask the sessiond if we have new metadata waiting and update the + * consumer metadata cache. + */ + ret = lttng_ustconsumer_sync_metadata(ctx, metadata); + break; + default: + assert(0); + ret = -1; + break; + } + /* + * Error or no new metadata, we exit here. + */ + if (ret <= 0 || ret == ENODATA) { + goto end_unlock_mutex; + } + + /* + * At this point, new metadata have been flushed, so we wait on the + * rendez-vous point for the metadata thread to wake us up when it + * finishes consuming the metadata and continue execution. + */ + + pthread_mutex_lock(&metadata->metadata_rdv_lock); + + /* + * Release metadata stream lock so the metadata thread can process it. + */ + pthread_mutex_unlock(&metadata->lock); + + /* + * Wait on the rendez-vous point. Once woken up, it means the metadata was + * consumed and thus synchronization is achieved. + */ + pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock); + pthread_mutex_unlock(&metadata->metadata_rdv_lock); + } while (ret == EAGAIN); + + /* Success */ + return 0; + +end_unlock_mutex: + pthread_mutex_unlock(&metadata->lock); + return ret; +} + +/* + * Synchronize the metadata using a given session ID. A successful acquisition + * of a metadata stream will trigger a request to the session daemon and a + * snapshot so the metadata thread can consume it. + * + * This function call is a rendez-vous point between the metadata thread and + * the data thread. + * + * Return 0 on success or else a negative value. + */ +int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, + uint64_t session_id) +{ + int ret; + struct lttng_consumer_stream *stream = NULL; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + + assert(ctx); + + /* Ease our life a bit. */ + ht = consumer_data.stream_list_ht; + + rcu_read_lock(); + + /* Search the metadata associated with the session id of the given stream. */ + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, + &session_id, &iter.iter, stream, node_session_id.node) { + if (!stream->metadata_flag) { + continue; + } + + ret = do_sync_metadata(stream, ctx); + if (ret < 0) { + goto end; + } + } + + /* + * Force return code to 0 (success) since ret might be ENODATA for instance + * which is not an error but rather that we should come back. + */ + ret = 0; + +end: + rcu_read_unlock(); + return ret; +} + +static int consumer_stream_sync_metadata_index( + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx) +{ + int ret; + + /* Block until all the metadata is sent. */ + pthread_mutex_lock(&stream->metadata_timer_lock); + assert(!stream->missed_metadata_flush); + stream->waiting_on_metadata = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + + ret = consumer_stream_sync_metadata(ctx, stream->session_id); + + pthread_mutex_lock(&stream->metadata_timer_lock); + stream->waiting_on_metadata = false; + if (stream->missed_metadata_flush) { + stream->missed_metadata_flush = false; + pthread_mutex_unlock(&stream->metadata_timer_lock); + (void) stream->read_subbuffer_ops.send_live_beacon(stream); + } else { + pthread_mutex_unlock(&stream->metadata_timer_lock); + } + if (ret < 0) { + goto end; + } + + ret = consumer_stream_send_index(stream, subbuffer, ctx); +end: + return ret; +} + +/* + * Check if the local version of the metadata stream matches with the version + * of the metadata stream in the kernel. If it was updated, set the reset flag + * on the stream. + */ +static +int metadata_stream_check_version(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + if (stream->metadata_version == subbuffer->info.metadata.version) { + goto end; + } + + DBG("New metadata version detected"); + stream->metadata_version = subbuffer->info.metadata.version; + stream->reset_metadata_flag = 1; + + if (stream->read_subbuffer_ops.reset_metadata) { + stream->read_subbuffer_ops.reset_metadata(stream); + } + +end: + return 0; +} + +struct lttng_consumer_stream *consumer_stream_create( + struct lttng_consumer_channel *channel, + uint64_t channel_key, + uint64_t stream_key, + enum lttng_consumer_stream_state state, + const char *channel_name, + uid_t uid, + gid_t gid, + uint64_t relayd_id, + uint64_t session_id, + int cpu, + int *alloc_ret, + enum consumer_channel_type type, + unsigned int monitor) +{ + int ret; + struct lttng_consumer_stream *stream; + + stream = zmalloc(sizeof(*stream)); + if (stream == NULL) { + PERROR("malloc struct lttng_consumer_stream"); + ret = -ENOMEM; + goto end; + } + + rcu_read_lock(); + stream->chan = channel; + stream->key = stream_key; + stream->out_fd = -1; + stream->out_fd_offset = 0; + stream->output_written = 0; + stream->state = state; + stream->uid = uid; + stream->gid = gid; + stream->relayd_id = relayd_id; + stream->session_id = session_id; + stream->monitor = monitor; + stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; + stream->index_file = NULL; + stream->last_sequence_number = -1ULL; + pthread_mutex_init(&stream->lock, NULL); + pthread_mutex_init(&stream->metadata_timer_lock, NULL); + + /* If channel is the metadata, flag this stream as metadata. */ + if (type == CONSUMER_CHANNEL_TYPE_METADATA) { + stream->metadata_flag = 1; + /* Metadata is flat out. */ + strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name)); + /* Live rendez-vous point. */ + pthread_cond_init(&stream->metadata_rdv, NULL); + pthread_mutex_init(&stream->metadata_rdv_lock, NULL); + } else { + /* Format stream name to _ */ + ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", + channel_name, cpu); + if (ret < 0) { + PERROR("snprintf stream name"); + goto error; + } + } + + /* Key is always the wait_fd for streams. */ + lttng_ht_node_init_u64(&stream->node, stream->key); + + /* Init node per channel id key */ + lttng_ht_node_init_u64(&stream->node_channel_id, channel_key); + + /* Init session id node with the stream session id */ + lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id); + + DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 + " relayd_id %" PRIu64 ", session_id %" PRIu64, + stream->name, stream->key, channel_key, + stream->relayd_id, stream->session_id); + + rcu_read_unlock(); + + switch (channel->output) { + case CONSUMER_CHANNEL_SPLICE: + stream->output = LTTNG_EVENT_SPLICE; + ret = utils_create_pipe(stream->splice_pipe); + if (ret < 0) { + goto error; + } + break; + case CONSUMER_CHANNEL_MMAP: + stream->output = LTTNG_EVENT_MMAP; + break; + default: + abort(); + } + + if (type == CONSUMER_CHANNEL_TYPE_METADATA) { + stream->read_subbuffer_ops.lock = + consumer_stream_metadata_lock_all; + stream->read_subbuffer_ops.unlock = + consumer_stream_metadata_unlock_all; + stream->read_subbuffer_ops.pre_consume_subbuffer = + metadata_stream_check_version; + } else { + stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all; + stream->read_subbuffer_ops.unlock = + consumer_stream_data_unlock_all; + stream->read_subbuffer_ops.pre_consume_subbuffer = + consumer_stream_update_stats; + if (channel->is_live) { + stream->read_subbuffer_ops.post_consume = + consumer_stream_sync_metadata_index; + } else { + stream->read_subbuffer_ops.post_consume = + consumer_stream_send_index; + } + } + + if (channel->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.consume_subbuffer = + consumer_stream_consume_mmap; + } else { + stream->read_subbuffer_ops.consume_subbuffer = + consumer_stream_consume_splice; + } + + return stream; + +error: + rcu_read_unlock(); + free(stream); +end: + if (alloc_ret) { + *alloc_ret = ret; + } + return NULL; +} + /* * Close stream on the relayd side. This call can destroy a relayd if the * conditions are met. @@ -397,163 +897,4 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, error: rcu_read_unlock(); return ret; -} - -/* - * Actually do the metadata sync using the given metadata stream. - * - * Return 0 on success else a negative value. ENODATA can be returned also - * indicating that there is no metadata available for that stream. - */ -static int do_sync_metadata(struct lttng_consumer_stream *metadata, - struct lttng_consumer_local_data *ctx) -{ - int ret; - - assert(metadata); - assert(metadata->metadata_flag); - assert(ctx); - - /* - * In UST, since we have to write the metadata from the cache packet - * by packet, we might need to start this procedure multiple times - * until all the metadata from the cache has been extracted. - */ - do { - /* - * Steps : - * - Lock the metadata stream - * - Check if metadata stream node was deleted before locking. - * - if yes, release and return success - * - Check if new metadata is ready (flush + snapshot pos) - * - If nothing : release and return. - * - Lock the metadata_rdv_lock - * - Unlock the metadata stream - * - cond_wait on metadata_rdv to wait the wakeup from the - * metadata thread - * - Unlock the metadata_rdv_lock - */ - pthread_mutex_lock(&metadata->lock); - - /* - * There is a possibility that we were able to acquire a reference on the - * stream from the RCU hash table but between then and now, the node might - * have been deleted just before the lock is acquired. Thus, after locking, - * we make sure the metadata node has not been deleted which means that the - * buffers are closed. - * - * In that case, there is no need to sync the metadata hence returning a - * success return code. - */ - ret = cds_lfht_is_node_deleted(&metadata->node.node); - if (ret) { - ret = 0; - goto end_unlock_mutex; - } - - switch (ctx->type) { - case LTTNG_CONSUMER_KERNEL: - /* - * Empty the metadata cache and flush the current stream. - */ - ret = lttng_kconsumer_sync_metadata(metadata); - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - /* - * Ask the sessiond if we have new metadata waiting and update the - * consumer metadata cache. - */ - ret = lttng_ustconsumer_sync_metadata(ctx, metadata); - break; - default: - assert(0); - ret = -1; - break; - } - /* - * Error or no new metadata, we exit here. - */ - if (ret <= 0 || ret == ENODATA) { - goto end_unlock_mutex; - } - - /* - * At this point, new metadata have been flushed, so we wait on the - * rendez-vous point for the metadata thread to wake us up when it - * finishes consuming the metadata and continue execution. - */ - - pthread_mutex_lock(&metadata->metadata_rdv_lock); - - /* - * Release metadata stream lock so the metadata thread can process it. - */ - pthread_mutex_unlock(&metadata->lock); - - /* - * Wait on the rendez-vous point. Once woken up, it means the metadata was - * consumed and thus synchronization is achieved. - */ - pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock); - pthread_mutex_unlock(&metadata->metadata_rdv_lock); - } while (ret == EAGAIN); - - /* Success */ - return 0; - -end_unlock_mutex: - pthread_mutex_unlock(&metadata->lock); - return ret; -} - -/* - * Synchronize the metadata using a given session ID. A successful acquisition - * of a metadata stream will trigger a request to the session daemon and a - * snapshot so the metadata thread can consume it. - * - * This function call is a rendez-vous point between the metadata thread and - * the data thread. - * - * Return 0 on success or else a negative value. - */ -int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, - uint64_t session_id) -{ - int ret; - struct lttng_consumer_stream *stream = NULL; - struct lttng_ht_iter iter; - struct lttng_ht *ht; - - assert(ctx); - - /* Ease our life a bit. */ - ht = consumer_data.stream_list_ht; - - rcu_read_lock(); - - /* Search the metadata associated with the session id of the given stream. */ - - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, - &session_id, &iter.iter, stream, node_session_id.node) { - if (!stream->metadata_flag) { - continue; - } - - ret = do_sync_metadata(stream, ctx); - if (ret < 0) { - goto end; - } - } - - /* - * Force return code to 0 (success) since ret might be ENODATA for instance - * which is not an error but rather that we should come back. - */ - ret = 0; - -end: - rcu_read_unlock(); - return ret; -} +} \ No newline at end of file diff --git a/src/common/consumer/consumer-stream.h b/src/common/consumer/consumer-stream.h index c5fb09732..52df0c255 100644 --- a/src/common/consumer/consumer-stream.h +++ b/src/common/consumer/consumer-stream.h @@ -20,6 +20,26 @@ #include "consumer.h" +/* + * Create a consumer stream. + * + * The channel lock MUST be acquired. + */ +struct lttng_consumer_stream *consumer_stream_create( + struct lttng_consumer_channel *channel, + uint64_t channel_key, + uint64_t stream_key, + enum lttng_consumer_stream_state state, + const char *channel_name, + uid_t uid, + gid_t gid, + uint64_t relayd_id, + uint64_t session_id, + int cpu, + int *alloc_ret, + enum consumer_channel_type type, + unsigned int monitor); + /* * Close stream's file descriptors and, if needed, close stream also on the * relayd side. diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 8eebce334..eac37654f 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -17,6 +17,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include "common/index/ctf-index.h" #define _LGPL_SOURCE #include #include @@ -532,94 +533,6 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) consumer_stream_destroy(stream, metadata_ht); } -struct lttng_consumer_stream *consumer_allocate_stream( - struct lttng_consumer_channel *channel, - uint64_t channel_key, - uint64_t stream_key, - enum lttng_consumer_stream_state state, - const char *channel_name, - uid_t uid, - gid_t gid, - uint64_t relayd_id, - uint64_t session_id, - int cpu, - int *alloc_ret, - enum consumer_channel_type type, - unsigned int monitor) -{ - int ret; - struct lttng_consumer_stream *stream; - - stream = zmalloc(sizeof(*stream)); - if (stream == NULL) { - PERROR("malloc struct lttng_consumer_stream"); - ret = -ENOMEM; - goto end; - } - - rcu_read_lock(); - stream->chan = channel; - stream->key = stream_key; - stream->out_fd = -1; - stream->out_fd_offset = 0; - stream->output_written = 0; - stream->state = state; - stream->uid = uid; - stream->gid = gid; - stream->relayd_id = relayd_id; - stream->session_id = session_id; - stream->monitor = monitor; - stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; - stream->index_file = NULL; - stream->last_sequence_number = -1ULL; - pthread_mutex_init(&stream->lock, NULL); - pthread_mutex_init(&stream->metadata_timer_lock, NULL); - - /* If channel is the metadata, flag this stream as metadata. */ - if (type == CONSUMER_CHANNEL_TYPE_METADATA) { - stream->metadata_flag = 1; - /* Metadata is flat out. */ - strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name)); - /* Live rendez-vous point. */ - pthread_cond_init(&stream->metadata_rdv, NULL); - pthread_mutex_init(&stream->metadata_rdv_lock, NULL); - } else { - /* Format stream name to _ */ - ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", - channel_name, cpu); - if (ret < 0) { - PERROR("snprintf stream name"); - goto error; - } - } - - /* Key is always the wait_fd for streams. */ - lttng_ht_node_init_u64(&stream->node, stream->key); - - /* Init node per channel id key */ - lttng_ht_node_init_u64(&stream->node_channel_id, channel_key); - - /* Init session id node with the stream session id */ - lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id); - - DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 - " relayd_id %" PRIu64 ", session_id %" PRIu64, - stream->name, stream->key, channel_key, - stream->relayd_id, stream->session_id); - - rcu_read_unlock(); - return stream; - -error: - rcu_read_unlock(); - free(stream); -end: - if (alloc_ret) { - *alloc_ret = ret; - } - return NULL; -} - /* * Add a stream to the global list protected by a mutex. */ @@ -1305,7 +1218,7 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx), + struct lttng_consumer_local_data *ctx, bool locked_by_caller), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(uint64_t stream_key, uint32_t state)) @@ -1513,8 +1426,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, const struct lttng_buffer_view *buffer, - unsigned long padding, - struct ctf_packet_index *index) + unsigned long padding) { ssize_t ret = 0; off_t orig_offset = stream->out_fd_offset; @@ -1625,10 +1537,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( orig_offset = 0; } stream->tracefile_size_current += buffer->size; - if (index) { - index->offset = htobe64(stream->out_fd_offset); - } - write_len = buffer->size; } @@ -1705,8 +1613,7 @@ end: ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding, - struct ctf_packet_index *index) + unsigned long padding) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@ -1831,7 +1738,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( orig_offset = 0; } stream->tracefile_size_current += len; - index->offset = htobe64(stream->out_fd_offset); } while (len > 0) { @@ -2354,7 +2260,7 @@ restart: do { health_code_update(); - len = ctx->on_buffer_ready(stream, ctx); + len = ctx->on_buffer_ready(stream, ctx, false); /* * We don't check the return value here since if we get * a negative len, it means an error occurred thus we @@ -2381,7 +2287,7 @@ restart: do { health_code_update(); - len = ctx->on_buffer_ready(stream, ctx); + len = ctx->on_buffer_ready(stream, ctx, false); /* * We don't check the return value here since if we get * a negative len, it means an error occurred thus we @@ -2588,7 +2494,7 @@ void *consumer_thread_data_poll(void *data) if (pollfd[i].revents & POLLPRI) { DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; - len = ctx->on_buffer_ready(local_stream[i], ctx); + len = ctx->on_buffer_ready(local_stream[i], ctx, false); /* it's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean the stream and free it. */ @@ -2619,7 +2525,7 @@ void *consumer_thread_data_poll(void *data) local_stream[i]->hangup_flush_done || local_stream[i]->has_data) { DBG("Normal read on fd %d", pollfd[i].fd); - len = ctx->on_buffer_ready(local_stream[i], ctx); + len = ctx->on_buffer_ready(local_stream[i], ctx, false); /* it's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean the stream and free it. */ @@ -3238,36 +3144,86 @@ error_testpoint: } ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, + bool locked_by_caller) { - ssize_t ret; + ssize_t ret, written_bytes; + struct stream_subbuffer subbuffer = {}; - pthread_mutex_lock(&stream->lock); - if (stream->metadata_flag) { - pthread_mutex_lock(&stream->metadata_rdv_lock); + if (!locked_by_caller) { + stream->read_subbuffer_ops.lock(stream); } - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx); - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - break; + if (stream->read_subbuffer_ops.on_wake_up) { + ret = stream->read_subbuffer_ops.on_wake_up(stream); + if (ret) { + goto end; + } } - if (stream->metadata_flag) { - pthread_cond_broadcast(&stream->metadata_rdv); - pthread_mutex_unlock(&stream->metadata_rdv_lock); + ret = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer); + if (ret) { + if (ret == -ENODATA) { + /* Not an error. */ + ret = 0; + } + goto end; } - pthread_mutex_unlock(&stream->lock); + + ret = stream->read_subbuffer_ops.pre_consume_subbuffer( + stream, &subbuffer); + if (ret) { + goto error_put_subbuf; + } + + written_bytes = stream->read_subbuffer_ops.consume_subbuffer( + ctx, stream, &subbuffer); + /* + * Should write subbuf_size amount of data when network streaming or + * the full padded size when we are not streaming. + */ + if ((written_bytes != subbuffer.info.data.subbuf_size && + stream->relayd_id != (uint64_t) -1ULL) || + (written_bytes != subbuffer.info.data.padded_subbuf_size && + stream->relayd_id == + (uint64_t) -1ULL)) { + /* + * Display the error but continue processing to try to + * release the subbuffer. This is a DBG statement + * since this can happen without being a critical + * error. + */ + DBG("Failed to write to tracefile (written_bytes: %zd != padded subbuffer size: %lu, subbuffer size: %lu)", + written_bytes, subbuffer.info.data.subbuf_size, + subbuffer.info.data.padded_subbuf_size); + } + + ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer); + if (ret) { + goto end; + } + + if (stream->read_subbuffer_ops.post_consume) { + ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx); + if (ret) { + goto end; + } + } + + if (stream->read_subbuffer_ops.on_sleep) { + stream->read_subbuffer_ops.on_sleep(stream, ctx); + } + + ret = written_bytes; +end: + if (!locked_by_caller) { + stream->read_subbuffer_ops.unlock(stream); + } + return ret; +error_put_subbuf: + (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer); + goto end; } int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 23166175f..e06e163bf 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -34,6 +34,9 @@ #include #include #include +#include + +struct lttng_consumer_local_data; /* Commands for consumer */ enum lttng_consumer_command { @@ -225,6 +228,142 @@ struct lttng_consumer_channel { bool streams_sent_to_relayd; }; +struct stream_subbuffer { + union { + /* + * CONSUMER_CHANNEL_SPLICE + * No ownership assumed. + */ + int fd; + /* CONSUMER_CHANNEL_MMAP */ + struct lttng_buffer_view buffer; + } buffer; + union { + /* + * Common members are fine to access through either + * union entries (as per C11, Common Initial Sequence). + */ + struct { + unsigned long subbuf_size; + unsigned long padded_subbuf_size; + uint64_t version; + } metadata; + struct { + unsigned long subbuf_size; + unsigned long padded_subbuf_size; + uint64_t packet_size; + uint64_t content_size; + uint64_t timestamp_begin; + uint64_t timestamp_end; + uint64_t events_discarded; + /* Left unset when unsupported. */ + LTTNG_OPTIONAL(uint64_t) sequence_number; + uint64_t stream_id; + /* Left unset when unsupported. */ + LTTNG_OPTIONAL(uint64_t) stream_instance_id; + } data; + } info; +}; + +/* + * Perform any operation required to acknowledge + * the wake-up of a consumer stream (e.g. consume a byte on a wake-up pipe). + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *); + +/* + * Perform any operation required before a consumer stream is put + * to sleep before awaiting a data availability notification. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*on_sleep_cb)(struct lttng_consumer_stream *, + struct lttng_consumer_local_data *); + +/* + * Acquire the subbuffer at the current 'consumed' position. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*get_next_subbuffer_cb)(struct lttng_consumer_stream *, + struct stream_subbuffer *); + +/* + * Populate the stream_subbuffer's info member. The info to populate + * depends on the type (metadata/data) of the stream. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*extract_subbuffer_info_cb)( + struct lttng_consumer_stream *, struct stream_subbuffer *); + +/* + * Invoked after a subbuffer's info has been filled. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *, + const struct stream_subbuffer *); + +/* + * Consume subbuffer contents. + * + * Stream and channel locks are acquired during this call. + */ +typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *, + struct lttng_consumer_stream *, + const struct stream_subbuffer *); + +/* + * Release the current subbuffer and advance the 'consumed' position by + * one subbuffer. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *, + struct stream_subbuffer *); + +/* + * Invoked after consuming a subbuffer. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*post_consume_cb)(struct lttng_consumer_stream *, + const struct stream_subbuffer *, + struct lttng_consumer_local_data *); + +/* + * Send a live beacon if no data is available. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *); + +/* + * Lock the stream and channel locks and any other stream-type specific + * lock that need to be acquired during the processing of an + * availability notification. + */ +typedef void (*lock_cb)(struct lttng_consumer_stream *); + +/* + * Unlock the stream and channel locks and any other stream-type specific + * lock before sleeping until the next availability notification. + * + * Stream and channel locks are acquired during this call. + */ +typedef void (*unlock_cb)(struct lttng_consumer_stream *); + +/* + * Invoked when a subbuffer's metadata version does not match the last + * known metadata version. + * + * Stream and channel locks are acquired during this call. + */ +typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *); + /* * Internal representation of the streams, sessiond_key is used to identify * uniquely a stream. @@ -420,6 +559,24 @@ struct lttng_consumer_stream { * file before writing in it (regeneration). */ unsigned int reset_metadata_flag:1; + struct { + /* + * Invoked in the order of declaration. + * See callback type definitions. + */ + lock_cb lock; + on_wake_up_cb on_wake_up; + get_next_subbuffer_cb get_next_subbuffer; + extract_subbuffer_info_cb extract_subbuffer_info; + pre_consume_subbuffer_cb pre_consume_subbuffer; + reset_metadata_cb reset_metadata; + consume_subbuffer_cb consume_subbuffer; + put_next_subbuffer_cb put_next_subbuffer; + post_consume_cb post_consume; + send_live_beacon_cb send_live_beacon; + on_sleep_cb on_sleep; + unlock_cb unlock; + } read_subbuffer_ops; }; /* @@ -476,7 +633,8 @@ struct lttng_consumer_local_data { * Returns the number of bytes read, or negative error value. */ ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -698,7 +856,8 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht); struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx), + struct lttng_consumer_local_data *ctx, + bool locked_by_caller), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(uint64_t sessiond_key, uint32_t state)); @@ -707,13 +866,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, const struct lttng_buffer_view *buffer, - unsigned long padding, - struct ctf_packet_index *index); + unsigned long padding); ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding, - struct ctf_packet_index *index); + unsigned long padding); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); @@ -727,7 +884,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); void consumer_add_relayd_socket(uint64_t relayd_id, int sock_type, struct lttng_consumer_local_data *ctx, int sock, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index f6a5ba7f5..2281f5d8e 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -16,8 +16,6 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#include "common/buffer-view.h" -#include #define _LGPL_SOURCE #include #include @@ -44,6 +42,10 @@ #include #include #include +#include +#include +#include +#include #include "kernel-consumer.h" @@ -298,7 +300,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, &subbuf_view, - padded_len - len, NULL); + padded_len - len); /* * We write the padded len in local tracefiles but the data len * when using a relay. Display the error but continue processing @@ -411,7 +413,7 @@ static int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, do { health_code_update(); - ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret_read < 0) { if (ret_read != -EAGAIN) { ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", @@ -652,7 +654,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); pthread_mutex_lock(&channel->lock); - new_stream = consumer_allocate_stream( + new_stream = consumer_stream_create( channel, channel->key, fd, @@ -679,19 +681,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } new_stream->wait_fd = fd; - switch (channel->output) { - case CONSUMER_CHANNEL_SPLICE: - new_stream->output = LTTNG_EVENT_SPLICE; - ret = utils_create_pipe(new_stream->splice_pipe); - if (ret < 0) { - goto end_nosignal; - } - break; - case CONSUMER_CHANNEL_MMAP: - new_stream->output = LTTNG_EVENT_MMAP; - break; - default: - ERR("Stream output unknown %d", channel->output); + ret = kernctl_get_max_subbuf_size(new_stream->wait_fd, + &new_stream->max_sb_size); + if (ret < 0) { + pthread_mutex_unlock(&channel->lock); + ERR("Failed to get kernel maximal subbuffer size"); goto end_nosignal; } @@ -1082,86 +1076,6 @@ error_fatal: return -1; } -/* - * Populate index values of a kernel stream. Values are set in big endian order. - * - * Return 0 on success or else a negative value. - */ -static int get_index_values(struct ctf_packet_index *index, int infd) -{ - int ret; - - ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin); - if (ret < 0) { - PERROR("kernctl_get_timestamp_begin"); - goto error; - } - index->timestamp_begin = htobe64(index->timestamp_begin); - - ret = kernctl_get_timestamp_end(infd, &index->timestamp_end); - if (ret < 0) { - PERROR("kernctl_get_timestamp_end"); - goto error; - } - index->timestamp_end = htobe64(index->timestamp_end); - - ret = kernctl_get_events_discarded(infd, &index->events_discarded); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); - goto error; - } - index->events_discarded = htobe64(index->events_discarded); - - ret = kernctl_get_content_size(infd, &index->content_size); - if (ret < 0) { - PERROR("kernctl_get_content_size"); - goto error; - } - index->content_size = htobe64(index->content_size); - - ret = kernctl_get_packet_size(infd, &index->packet_size); - if (ret < 0) { - PERROR("kernctl_get_packet_size"); - goto error; - } - index->packet_size = htobe64(index->packet_size); - - ret = kernctl_get_stream_id(infd, &index->stream_id); - if (ret < 0) { - PERROR("kernctl_get_stream_id"); - goto error; - } - index->stream_id = htobe64(index->stream_id); - - ret = kernctl_get_instance_id(infd, &index->stream_instance_id); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - index->stream_instance_id = -1ULL; - ret = 0; - } else { - PERROR("kernctl_get_instance_id"); - goto error; - } - } - index->stream_instance_id = htobe64(index->stream_instance_id); - - ret = kernctl_get_sequence_number(infd, &index->packet_seq_num); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - index->packet_seq_num = -1ULL; - ret = 0; - } else { - PERROR("kernctl_get_sequence_number"); - goto error; - } - } - index->packet_seq_num = htobe64(index->packet_seq_num); - -error: - return ret; -} /* * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. @@ -1200,342 +1114,226 @@ end: } static -int update_stream_stats(struct lttng_consumer_stream *stream) +int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; - uint64_t seq, discarded; - - ret = kernctl_get_sequence_number(stream->wait_fd, &seq); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - seq = -1ULL; - ret = 0; - } else { - PERROR("kernctl_get_sequence_number"); - goto end; - } - } - /* - * Start the sequence when we extract the first packet in case we don't - * start at 0 (for example if a consumer is not connected to the - * session immediately after the beginning). - */ - if (stream->last_sequence_number == -1ULL) { - stream->last_sequence_number = seq; - } else if (seq > stream->last_sequence_number) { - stream->chan->lost_packets += seq - - stream->last_sequence_number - 1; - } else { - /* seq <= last_sequence_number */ - ERR("Sequence number inconsistent : prev = %" PRIu64 - ", current = %" PRIu64, - stream->last_sequence_number, seq); - ret = -1; + ret = kernctl_get_subbuf_size( + stream->wait_fd, &subbuf->info.data.subbuf_size); + if (ret) { goto end; } - stream->last_sequence_number = seq; - ret = kernctl_get_events_discarded(stream->wait_fd, &discarded); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); + ret = kernctl_get_padded_subbuf_size( + stream->wait_fd, &subbuf->info.data.padded_subbuf_size); + if (ret) { goto end; } - if (discarded < stream->last_discarded_events) { - /* - * Overflow has occurred. We assume only one wrap-around - * has occurred. - */ - stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) - - stream->last_discarded_events + discarded; - } else { - stream->chan->discarded_events += discarded - - stream->last_discarded_events; - } - stream->last_discarded_events = discarded; - ret = 0; end: return ret; } -/* - * Check if the local version of the metadata stream matches with the version - * of the metadata stream in the kernel. If it was updated, set the reset flag - * on the stream. - */ static -int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream) +int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; - uint64_t cur_version; - ret = kernctl_get_metadata_version(infd, &cur_version); - if (ret < 0) { - if (ret == -ENOTTY) { - /* - * LTTng-modules does not implement this - * command. - */ - ret = 0; - goto end; - } - ERR("Failed to get the metadata version"); + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { goto end; } - if (stream->metadata_version == cur_version) { - ret = 0; + ret = kernctl_get_metadata_version( + stream->wait_fd, &subbuf->info.metadata.version); + if (ret) { goto end; } - DBG("New metadata version detected"); - stream->metadata_version = cur_version; - stream->reset_metadata_flag = 1; - ret = 0; - end: return ret; } -/* - * Consume data on a file descriptor and write it on a trace file. - */ -ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) +static +int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { - unsigned long len, subbuf_size, padding; - int err, write_index = 1; - ssize_t ret = 0; - int infd = stream->wait_fd; - struct ctf_packet_index index; + int ret; - DBG("In read_subbuffer (infd : %d)", infd); + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } - /* Get the next subbuffer */ - err = kernctl_get_next_subbuf(infd); - if (err != 0) { - /* - * This is a debug message even for single-threaded consumer, - * because poll() have more relaxed criterions than get subbuf, - * so get_subbuf may fail for short race windows where poll() - * would issue wakeups. - */ - DBG("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency)"); - ret = err; + ret = kernctl_get_packet_size( + stream->wait_fd, &subbuf->info.data.packet_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer packet size"); goto end; } - /* Get the full subbuffer size including padding */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - PERROR("Getting sub-buffer len failed."); - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto end; - } - ret = err; + ret = kernctl_get_content_size( + stream->wait_fd, &subbuf->info.data.content_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer content size"); goto end; } - if (!stream->metadata_flag) { - ret = get_index_values(&index, infd); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto end; - } - goto end; - } - ret = update_stream_stats(stream); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto end; - } + ret = kernctl_get_timestamp_begin( + stream->wait_fd, &subbuf->info.data.timestamp_begin); + if (ret < 0) { + PERROR("Failed to get sub-buffer begin timestamp"); + goto end; + } + + ret = kernctl_get_timestamp_end( + stream->wait_fd, &subbuf->info.data.timestamp_end); + if (ret < 0) { + PERROR("Failed to get sub-buffer end timestamp"); + goto end; + } + + ret = kernctl_get_events_discarded( + stream->wait_fd, &subbuf->info.data.events_discarded); + if (ret) { + PERROR("Failed to get sub-buffer events discarded count"); + goto end; + } + + ret = kernctl_get_sequence_number(stream->wait_fd, + &subbuf->info.data.sequence_number.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get sub-buffer sequence number"); goto end; } } else { - write_index = 0; - ret = metadata_stream_check_version(infd, stream); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto end; - } - goto end; - } + subbuf->info.data.sequence_number.is_set = true; } - switch (stream->chan->output) { - case CONSUMER_CHANNEL_SPLICE: - /* - * XXX: The lttng-modules splice "actor" does not handle copying - * partial pages hence only using the subbuffer size without the - * padding makes the splice fail. - */ - subbuf_size = len; - padding = 0; + ret = kernctl_get_stream_id( + stream->wait_fd, &subbuf->info.data.stream_id); + if (ret < 0) { + PERROR("Failed to get stream id"); + goto end; + } - /* splice the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, - padding, &index); - /* - * XXX: Splice does not support network streaming so the return value - * is simply checked against subbuf_size and not like the mmap() op. - */ - if (ret != subbuf_size) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile (ret: %zd != len: %lu)", - ret, subbuf_size); - write_index = 0; - } - break; - case CONSUMER_CHANNEL_MMAP: - { - const char *subbuf_addr; - struct lttng_buffer_view subbuf_view; - - /* Get subbuffer size without padding */ - err = kernctl_get_subbuf_size(infd, &subbuf_size); - if (err != 0) { - PERROR("Getting sub-buffer len failed."); - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto end; - } - ret = err; + ret = kernctl_get_instance_id(stream->wait_fd, + &subbuf->info.data.stream_instance_id.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get stream instance id"); goto end; } + } else { + subbuf->info.data.stream_instance_id.is_set = true; + } +end: + return ret; +} - ret = get_current_subbuf_addr(stream, &subbuf_addr); - if (ret) { - goto error_put_subbuf; - } +static +int get_subbuffer_common(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; - /* Make sure the tracer is not gone mad on us! */ - assert(len >= subbuf_size); + ret = kernctl_get_next_subbuf(stream->wait_fd); + if (ret) { + goto end; + } - padding = len - subbuf_size; + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); +end: + return ret; +} - subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); +static +int get_next_subbuffer_splice(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, &subbuf_view, padding, &index); - /* - * The mmap operation should write subbuf_size amount of data - * when network streaming or the full padding (len) size when we - * are _not_ streaming. - */ - if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || - (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { - /* - * Display the error but continue processing to try to release the - * subbuffer. This is a DBG statement since this is possible to - * happen without being a critical error. - */ - DBG("Error writing to tracefile " - "(ret: %zd != len: %lu != subbuf_size: %lu)", - ret, len, subbuf_size); - write_index = 0; - } - break; - } - default: - ERR("Unknown output method"); - ret = -EPERM; + ret = get_subbuffer_common(stream, subbuffer); + if (ret) { + goto end; } -error_put_subbuf: - err = kernctl_put_next_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; + + subbuffer->buffer.fd = stream->wait_fd; +end: + return ret; +} + +static +int get_next_subbuffer_mmap(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + + ret = get_subbuffer_common(stream, subbuffer); + if (ret) { goto end; } - /* Write index if needed. */ - if (!write_index) { + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { goto end; } - if (stream->chan->live_timer_interval && !stream->metadata_flag) { - /* - * In live, block until all the metadata is sent. - */ - pthread_mutex_lock(&stream->metadata_timer_lock); - assert(!stream->missed_metadata_flush); - stream->waiting_on_metadata = true; - pthread_mutex_unlock(&stream->metadata_timer_lock); - - err = consumer_stream_sync_metadata(ctx, stream->session_id); - - pthread_mutex_lock(&stream->metadata_timer_lock); - stream->waiting_on_metadata = false; - if (stream->missed_metadata_flush) { - stream->missed_metadata_flush = false; - pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) consumer_flush_kernel_index(stream); - } else { - pthread_mutex_unlock(&stream->metadata_timer_lock); - } - if (err < 0) { - goto end; + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); +end: + return ret; +} + +static +int put_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + const int ret = kernctl_put_next_subbuf(stream->wait_fd); + + if (ret) { + if (ret == -EFAULT) { + PERROR("Error in unreserving sub buffer"); + } else if (ret == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted"); } } - err = consumer_stream_write_index(stream, &index); - if (err < 0) { - goto end; + return ret; +} + +static void lttng_kconsumer_set_stream_ops( + struct lttng_consumer_stream *stream) +{ + if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_mmap; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_splice; } -end: - return ret; + if (stream->metadata_flag) { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_metadata_subbuffer_info; + } else { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_data_subbuffer_info; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.send_live_beacon = + consumer_flush_kernel_index; + } + } + + stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; } int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) @@ -1593,6 +1391,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } } + lttng_kconsumer_set_stream_ops(stream); + /* we return 0 to let the library handle the FD internally */ return 0; diff --git a/src/common/kernel-consumer/kernel-consumer.h b/src/common/kernel-consumer/kernel-consumer.h index a07f52188..54d512536 100644 --- a/src/common/kernel-consumer/kernel-consumer.h +++ b/src/common/kernel-consumer/kernel-consumer.h @@ -28,8 +28,6 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); -ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream); int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream); int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 225f80e90..1cb56f283 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -16,7 +16,6 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#include #define _LGPL_SOURCE #include #include @@ -32,6 +31,8 @@ #include #include #include +#include +#include #include #include @@ -44,6 +45,7 @@ #include #include #include +#include #include "ust-consumer.h" @@ -135,7 +137,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, assert(channel); assert(ctx); - stream = consumer_allocate_stream( + stream = consumer_stream_create( channel, channel->key, key, @@ -1049,7 +1051,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, do { health_code_update(); - ret = lttng_consumer_read_subbuffer(metadata_stream, ctx); + ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret < 0) { goto error_stream; } @@ -1241,8 +1243,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, - stream, &subbuf_view, padded_len - len, - NULL); + stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -2132,108 +2133,16 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) return ustctl_stream_close_wakeup_fd(stream->ustream); } -/* - * Populate index values of a UST stream. Values are set in big endian order. - * - * Return 0 on success or else a negative value. - */ -static int get_index_values(struct ctf_packet_index *index, - struct ustctl_consumer_stream *ustream) -{ - int ret; - - ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin); - if (ret < 0) { - PERROR("ustctl_get_timestamp_begin"); - goto error; - } - index->timestamp_begin = htobe64(index->timestamp_begin); - - ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end); - if (ret < 0) { - PERROR("ustctl_get_timestamp_end"); - goto error; - } - index->timestamp_end = htobe64(index->timestamp_end); - - ret = ustctl_get_events_discarded(ustream, &index->events_discarded); - if (ret < 0) { - PERROR("ustctl_get_events_discarded"); - goto error; - } - index->events_discarded = htobe64(index->events_discarded); - - ret = ustctl_get_content_size(ustream, &index->content_size); - if (ret < 0) { - PERROR("ustctl_get_content_size"); - goto error; - } - index->content_size = htobe64(index->content_size); - - ret = ustctl_get_packet_size(ustream, &index->packet_size); - if (ret < 0) { - PERROR("ustctl_get_packet_size"); - goto error; - } - index->packet_size = htobe64(index->packet_size); - - ret = ustctl_get_stream_id(ustream, &index->stream_id); - if (ret < 0) { - PERROR("ustctl_get_stream_id"); - goto error; - } - index->stream_id = htobe64(index->stream_id); - - ret = ustctl_get_instance_id(ustream, &index->stream_instance_id); - if (ret < 0) { - PERROR("ustctl_get_instance_id"); - goto error; - } - index->stream_instance_id = htobe64(index->stream_instance_id); - - ret = ustctl_get_sequence_number(ustream, &index->packet_seq_num); - if (ret < 0) { - PERROR("ustctl_get_sequence_number"); - goto error; - } - index->packet_seq_num = htobe64(index->packet_seq_num); - -error: - return ret; -} - static -void metadata_stream_reset_cache(struct lttng_consumer_stream *stream, - struct consumer_metadata_cache *cache) +void metadata_stream_reset_cache(struct lttng_consumer_stream *stream) { - DBG("Metadata stream update to version %" PRIu64, - cache->version); + DBG("Reset metadata cache of session %" PRIu64, + stream->chan->session_id); stream->ust_metadata_pushed = 0; - stream->metadata_version = cache->version; + stream->metadata_version = stream->chan->metadata_cache->version; stream->reset_metadata_flag = 1; } -/* - * Check if the version of the metadata stream and metadata cache match. - * If the cache got updated, reset the metadata stream. - * The stream lock and metadata cache lock MUST be held. - * Return 0 on success, a negative value on error. - */ -static -int metadata_stream_check_version(struct lttng_consumer_stream *stream) -{ - int ret = 0; - struct consumer_metadata_cache *cache = stream->chan->metadata_cache; - - if (cache->version == stream->metadata_version) { - goto end; - } - metadata_stream_reset_cache(stream, cache); - -end: - return ret; -} - /* * Write up to one packet from the metadata cache to the channel. * @@ -2247,10 +2156,6 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - ret = metadata_stream_check_version(stream); - if (ret < 0) { - goto end; - } if (stream->chan->metadata_cache->max_offset == stream->ust_metadata_pushed) { ret = 0; @@ -2273,6 +2178,12 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) stream->ust_metadata_pushed); ret = write_len; + /* + * Switch packet (but don't open the next one) on every commit of + * a metadata packet. Since the subbuffer is fully filled (with padding, + * if needed), the stream is "quiescent" after this commit. + */ + ustctl_flush_buffer(stream->ustream, 1); end: pthread_mutex_unlock(&stream->chan->metadata_cache->lock); return ret; @@ -2392,261 +2303,264 @@ end: return ret; } -static -int update_stream_stats(struct lttng_consumer_stream *stream) +static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream) { - int ret; - uint64_t seq, discarded; + int ret = 0; - ret = ustctl_get_sequence_number(stream->ustream, &seq); - if (ret < 0) { - PERROR("ustctl_get_sequence_number"); - goto end; - } /* - * Start the sequence when we extract the first packet in case we don't - * start at 0 (for example if a consumer is not connected to the - * session immediately after the beginning). + * We can consume the 1 byte written into the wait_fd by + * UST. Don't trigger error if we cannot read this one byte + * (read returns 0), or if the error is EAGAIN or EWOULDBLOCK. + * + * This is only done when the stream is monitored by a thread, + * before the flush is done after a hangup and if the stream + * is not flagged with data since there might be nothing to + * consume in the wait fd but still have data available + * flagged by the consumer wake up pipe. */ - if (stream->last_sequence_number == -1ULL) { - stream->last_sequence_number = seq; - } else if (seq > stream->last_sequence_number) { - stream->chan->lost_packets += seq - - stream->last_sequence_number - 1; - } else { - /* seq <= last_sequence_number */ - ERR("Sequence number inconsistent : prev = %" PRIu64 - ", current = %" PRIu64, - stream->last_sequence_number, seq); - ret = -1; - goto end; + if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) { + char dummy; + ssize_t readlen; + + readlen = lttng_read(stream->wait_fd, &dummy, 1); + if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + ret = readlen; + } } - stream->last_sequence_number = seq; - ret = ustctl_get_events_discarded(stream->ustream, &discarded); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); + return ret; +} + +static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; + + ret = ustctl_get_subbuf_size( + stream->ustream, &subbuf->info.data.subbuf_size); + if (ret) { goto end; } - if (discarded < stream->last_discarded_events) { - /* - * Overflow has occurred. We assume only one wrap-around - * has occurred. - */ - stream->chan->discarded_events += - (1ULL << (CAA_BITS_PER_LONG - 1)) - - stream->last_discarded_events + discarded; - } else { - stream->chan->discarded_events += discarded - - stream->last_discarded_events; + + ret = ustctl_get_padded_subbuf_size( + stream->ustream, &subbuf->info.data.padded_subbuf_size); + if (ret) { + goto end; } - stream->last_discarded_events = discarded; - ret = 0; end: return ret; } -/* - * Read subbuffer from the given stream. - * - * Stream lock MUST be acquired. - * - * Return 0 on success else a negative value. - */ -int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) +static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { - unsigned long len, subbuf_size, padding; - int err, write_index = 1; - long ret = 0; - struct ustctl_consumer_stream *ustream; - struct ctf_packet_index index; - const char *subbuf_addr; - struct lttng_buffer_view subbuf_view; + int ret; - assert(stream); - assert(stream->ustream); - assert(ctx); + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } - DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd, - stream->name); + subbuf->info.metadata.version = stream->chan->metadata_cache->version; - /* Ease our life for what's next. */ - ustream = stream->ustream; +end: + return ret; +} - /* - * We can consume the 1 byte written into the wait_fd by UST. Don't trigger - * error if we cannot read this one byte (read returns 0), or if the error - * is EAGAIN or EWOULDBLOCK. - * - * This is only done when the stream is monitored by a thread, before the - * flush is done after a hangup and if the stream is not flagged with data - * since there might be nothing to consume in the wait fd but still have - * data available flagged by the consumer wake up pipe. - */ - if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) { - char dummy; - ssize_t readlen; +static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; - readlen = lttng_read(stream->wait_fd, &dummy, 1); - if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - ret = readlen; - goto end; - } + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; } -retry: - /* Get the next subbuffer */ - err = ustctl_get_next_subbuf(ustream); - if (err != 0) { - /* - * Populate metadata info if the existing info has - * already been read. - */ - if (stream->metadata_flag) { - ret = commit_one_metadata_packet(stream); - if (ret <= 0) { - goto end; - } - ustctl_flush_buffer(stream->ustream, 1); - goto retry; - } + ret = ustctl_get_packet_size( + stream->ustream, &subbuf->info.data.packet_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer packet size"); + goto end; + } - ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ - /* - * This is a debug message even for single-threaded consumer, - * because poll() have more relaxed criterions than get subbuf, - * so get_subbuf may fail for short race windows where poll() - * would issue wakeups. - */ - DBG("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency) [ret: %d]", err); + ret = ustctl_get_content_size( + stream->ustream, &subbuf->info.data.content_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer content size"); goto end; } - assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); - if (!stream->metadata_flag) { - index.offset = htobe64(stream->out_fd_offset); - ret = get_index_values(&index, ustream); - if (ret < 0) { - err = ustctl_put_subbuf(ustream); - assert(err == 0); - goto end; - } + ret = ustctl_get_timestamp_begin( + stream->ustream, &subbuf->info.data.timestamp_begin); + if (ret < 0) { + PERROR("Failed to get sub-buffer begin timestamp"); + goto end; + } - /* Update the stream's sequence and discarded events count. */ - ret = update_stream_stats(stream); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); - err = ustctl_put_subbuf(ustream); - assert(err == 0); + ret = ustctl_get_timestamp_end( + stream->ustream, &subbuf->info.data.timestamp_end); + if (ret < 0) { + PERROR("Failed to get sub-buffer end timestamp"); + goto end; + } + + ret = ustctl_get_events_discarded( + stream->ustream, &subbuf->info.data.events_discarded); + if (ret) { + PERROR("Failed to get sub-buffer events discarded count"); + goto end; + } + + ret = ustctl_get_sequence_number(stream->ustream, + &subbuf->info.data.sequence_number.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get sub-buffer sequence number"); goto end; } } else { - write_index = 0; + subbuf->info.data.sequence_number.is_set = true; } - /* Get the full padded subbuffer size */ - err = ustctl_get_padded_subbuf_size(ustream, &len); - assert(err == 0); + ret = ustctl_get_stream_id( + stream->ustream, &subbuf->info.data.stream_id); + if (ret < 0) { + PERROR("Failed to get stream id"); + goto end; + } - /* Get subbuffer data size (without padding) */ - err = ustctl_get_subbuf_size(ustream, &subbuf_size); - assert(err == 0); + ret = ustctl_get_instance_id(stream->ustream, + &subbuf->info.data.stream_instance_id.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get stream instance id"); + goto end; + } + } else { + subbuf->info.data.stream_instance_id.is_set = true; + } +end: + return ret; +} - /* Make sure we don't get a subbuffer size bigger than the padded */ - assert(len >= subbuf_size); +static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; - padding = len - subbuf_size; + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); + if (ret) { + goto end; + } - ret = get_current_subbuf_addr(stream, &subbuf_addr); + ret = get_current_subbuf_addr(stream, &addr); if (ret) { - write_index = 0; - goto error_put_subbuf; + goto end; } - subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); + assert(subbuffer->buffer.buffer.data != NULL); +end: + return ret; +} - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, &subbuf_view, padding, &index); - /* - * The mmap operation should write subbuf_size amount of data when - * network streaming or the full padding (len) size when we are _not_ - * streaming. - */ - if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || - (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { - /* - * Display the error but continue processing to try to release the - * subbuffer. This is a DBG statement since any unexpected kill or - * signal, the application gets unregistered, relayd gets closed or - * anything that affects the buffer lifetime will trigger this error. - * So, for the sake of the user, don't print this error since it can - * happen and it is OK with the code flow. - */ - DBG("Error writing to tracefile " - "(ret: %ld != len: %lu != subbuf_size: %lu)", - ret, len, subbuf_size); - write_index = 0; - } -error_put_subbuf: - err = ustctl_put_next_subbuf(ustream); - assert(err == 0); +static int get_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; - /* - * This will consumer the byte on the wait_fd if and only if there is not - * next subbuffer to be acquired. - */ - if (!stream->metadata_flag) { - ret = notify_if_more_data(stream, ctx); - if (ret < 0) { - goto end; - } + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret) { + goto end; } - /* Write index if needed. */ - if (!write_index) { + ret = get_next_subbuffer_common(stream, subbuffer); + if (ret) { goto end; } +end: + return ret; +} - if (stream->chan->live_timer_interval && !stream->metadata_flag) { - /* - * In live, block until all the metadata is sent. - */ - pthread_mutex_lock(&stream->metadata_timer_lock); - assert(!stream->missed_metadata_flush); - stream->waiting_on_metadata = true; - pthread_mutex_unlock(&stream->metadata_timer_lock); - - err = consumer_stream_sync_metadata(ctx, stream->session_id); - - pthread_mutex_lock(&stream->metadata_timer_lock); - stream->waiting_on_metadata = false; - if (stream->missed_metadata_flush) { - stream->missed_metadata_flush = false; - pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) consumer_flush_ust_index(stream); - } else { - pthread_mutex_unlock(&stream->metadata_timer_lock); +static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret) { + ret = commit_one_metadata_packet(stream); + if (ret < 0) { + goto end; + } else if (ret == 0) { + /* Not an error, the cache is empty. */ + ret = -ENODATA; + goto end; } - if (err < 0) { + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret) { goto end; } } - assert(!stream->metadata_flag); - err = consumer_stream_write_index(stream, &index); - if (err < 0) { + ret = get_next_subbuffer_common(stream, subbuffer); + if (ret) { goto end; } - end: return ret; } +static int put_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + const int ret = ustctl_put_next_subbuf(stream->ustream); + + assert(ret == 0); + return ret; +} + +static int signal_metadata(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; +} + +static void lttng_ustconsumer_set_stream_ops( + struct lttng_consumer_stream *stream) +{ + stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; + if (stream->metadata_flag) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_metadata; + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_metadata_subbuffer_info; + stream->read_subbuffer_ops.reset_metadata = + metadata_stream_reset_cache; + stream->read_subbuffer_ops.on_sleep = signal_metadata; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer; + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_data_subbuffer_info; + stream->read_subbuffer_ops.on_sleep = notify_if_more_data; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.send_live_beacon = + consumer_flush_ust_index; + } + } + + stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +} + /* * Called when a stream is created. * @@ -2683,6 +2597,8 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) stream->index_file = index_file; } } + + lttng_ustconsumer_set_stream_ops(stream); ret = 0; error: -- 2.34.1 From d6ef77b34f2ba071e280bc8c25936e95bf1094d9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Thu, 14 May 2020 14:24:17 -0400 Subject: [PATCH 09/16] Fix: consumerd: live client receives incomplete metadata MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Observed issue ============== Babeltrace 1.5.x and Babeltrace 2.x can both report errors (albeit differently) when using the "lttng-live" protocol that imply that the metadata they received is incomplete. For instance, babeltrace 1.5.3 reports the following error: ``` [error] Error creating AST [error] [Context] Cannot open_mmap_trace of format ctf. [error] Error adding trace [warning] [Context] Cannot open_trace of format lttng-live at path net://localhost:xxxx/host/session/live_session. [warning] [Context] cannot open trace "net://localhost:xxxx/host/session/live_session" for reading. [error] opening trace "net://localhost:xxxx/host/session/live_session" for reading. [error] none of the specified trace paths could be opened. ``` While debugging both viewers, I noticed that both were attempting to receive the available metadata before consuming the "data" streams' content. Typically, the following exchange between the relay daemon and the lttng-live client occurs when the problem is observed: bt lttng-live: emits LTTNG_VIEWER_GET_METADATA command relayd: returns LTTNG_VIEWER_METADATA_OK, len = 4096 (default packet size) bt lttng-live: consume 4096 bytes of metadata emits LTTNG_VIEWER_GET_METADATA command relayd: returns LTTNG_VIEWER_NO_NEW_METADATA When the lttng-live client receives the LTTNG_VIEWER_NO_NEW_METADATA status code, it attempts to parse all the metadata it has received since the last LTTNG_VIEWER_NO_NEW_METADATA reply. In effect, it is expected that this forms a logical unit of metadata that is parseable on its own. If this is the first time metadata is received for that trace, the metadata is expected to contain a trace declaration, packet header declaration, etc. If metadata was already received, it is expected that the newly parsed declarations can be "appended" to the existing trace schema. It appears that the relay daemon sends the LTTNG_VIEWER_NO_NEW_METADATA while the metadata it has sent up to that point is not parseable on its own. The live protocol description does not require or imply that a viewer should attempt to parse metadata packets until it hopefully succeeds at some point. Anyhow: 1) This would make it impossible for a live viewer to correctly handle a corrupted metadata stream beyond retrying forever, 2) This behaviour is not implemented by the two reference implementations of the protocol. Cause ===== The relay daemon provides a guarantee that it will send any available metadata before allowing a data stream packet to be served to the client. In other words, a client requesting a data packet will receive the LTTNG_VIEWER_FLAG_NEW_METADATA status code (and no data) if it attempts to get a data stream packet while the relay daemon has metadata already available. This guarantee is properly enforced as far as I can tell. However, looking at the consumer daemon implementation, it appears that metadata packets are sent as soon as they are available. A metadata packet is not guaranteed to be parseable on its own. For instance, it can end in the middle the an event declaration. Hence, this hints at a race involving the tracer, the consumer daemon, the relay daemon, and the lttng-live client. Consider the following scenario: - Metadata packets (sub-buffers) are configured to be 4kB in size, - a large number of kernel events are enabled (e.g. --kernel --all), - the network connection between the consumer and relay daemons is slow 1) The kernel tracer will produce enough TSDL metadata to fill the first sub-buffer of the "metadata" ring-buffer and signal the consumer daemon that a buffer is ready. The tracer then starts writing the remaining data in the following available sub-buffers. 2) The consumer daemon metadata thread is woken up and consumes the first metadata sub-buffer and sends it to the relay daemon. 3) A live client establishes an lttng-live connection to the relay daemon and attempts to consume the available metadata. It receives the first packet and, since the relay daemon doesn't know about any follow-up metadata, receives LTTNG_VIEWER_NO_NEW_METADATA on the next attempt. 4) Having received LTTNG_VIEWER_NO_NEW_METADATA, the lttng-live client attempts to parse the metadata it has received and fails. This scenario is easy to reproduce by inserting a "sleep(1)" at src/bin/lttng-relayd/main.c:1978 (as of this revision). This simulates a relay daemon that would be slow to receive/process metadata packets from the consumer daemon. This problem similarly applies to the user space tracer. Solution ======== Having no knowledge of TSDL, the relay daemon can't "bundle" packets of metadata until they form a "parseable unit" to send to the consumer daemon. To provide the parseability guarantee expected by the viewers, and by the relay daemon implicitly, we need to ensure that the consumer daemons only send "parseable units" of metadata to the relay daemon. Unfortunately, the consumer daemons do not know how to parse TSDL either. In fact, only the metadata producers are able to provide the boundaries of the "parseable units" of metadata. The general idea of the fix is to accumulate metadata up to a point where a "parseable unit" boundary has been identified and send that content in one request to the relay daemon. Note that the solution described here only concerns the live mode. In other cases, the mechanisms described are simply bypassed. A "metadata bucket" is added to lttng_consumer_stream when it is created from a live channel. This bucket is filled until the consumption position reaches the "parseable unit" end position. A refresher about the handling of metadata in live mode ------------------------------------------------------- Three "events" are of interest here and can cause metadata to be consumed more or less indirectly: 1) A metadata packet is closed, causing the metadata thread to wake up 2) The live timer expires 3) A data sub-buffer is closed, causing the data thread to wake-up 1) The first case is simple and happens regardless of whether or not the tracing session is in live mode or not. Metadata is always consumed by the metadata thread in the same way. However, this scenario can be "caused" by (2) and (3). See [1]. A sub-buffer is "acquired" from the metadata ring-buffer and sent to the relayd daemon as the payload of a "RELAYD_SEND_METADATA" command. 2) When the live timer expires [2], the 'check_stream' function is called on all data streams of the session. As its name clearly implies, this function is responsible for flushing all streams or sending a "live beacon" (called an "empty index" in the code) if there is no data to flush. Any flushed data will result in (3). 3) When a data sub-buffer is ready to be consumed, [1] is invoked by the data thread. This function acquires a sub-buffer and sends it to the relay daemon through the data connection. Then, an important synchronization step takes place. The index of the newly-sent packet will be sent through the control connection. The relay daemon waits for both the data packet and its matching index before making the new packet visible to live viewers. Since a data packet could contain data that requires "newer" metadata to be decoded, the data thread flushes the metadata stream and enters a "waiting" phase to pause until all metadata present in the metadata ring buffer has been consumed [3]. At the end of this waiting phase, the data thread sends the data packet's index to the relay daemon, allowing the relayd to make it visible to its live clients. How to identify a "parseable unit" boundary? -------------------------------------------- In the case of the kernel domain, the kernel tracer produces the actual TSDL descriptions directly. The TSDL metadata is serialized to a metadata cache and is flushed "just in time" to the metadata ring-buffer when a "get next" operation is performed. There is no way, from user space, to query whether or not the metadata cache of the kernel tracer is empty. Hence, a new RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK command was added to query whether or not the kernel tracer's metadata cache is empty when acquiring a sub-buffer. This allows the consumer daemon to identify a "coherent" position in the metadata stream that is safe to use as a "parseable unit" boundary. As for the user space domain, since the session daemon is responsible for generating the TSDL representation of the metadata, there is no need to change LTTng-ust APIs. The session daemon generates coherent units of metadata and adds them to its "registry" at once (protected by the registry's lock). It then flushes the contents to the consumer daemon and waits for that data to be consumed before proceeding further. On the consumer daemon side, the metadata cache is filled with the newly-produced contents. This is done atomically with respect to accesses to the metadata cache as all accesses happen through a dedicated metadata cache lock. When the consumer's metadata polling thread is woken-up, it will attempt to acquire (`get_next`) a sub-buffer from the metadata stream ring-buffer. If it fails, it will flush a sub-buffer's worth of metadata to the ring-buffer and attempt to acquire a sub-buffer again. At this point, it is possible to determine if that sub-buffer is the last one of a parseable metadata unit: the cache must be empty and the ring-buffer must be empty following the consumption of this sub-buffer. When those conditions are met, the resulting metadata `stream_subbuffer` is tagged as being `coherent`. Metadata bucket --------------- A helper interface, metadata_bucket, is introduced as part of this fix. A metadata_bucket is `fill`ed with `stream_subbuffer`s, and is eventually `flushed` when it is filled by a `coherent` sub-buffer. As older versions of LTTng-modules must remain supported, this new helper is not used when the RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK operation is not available. When the operation is available, the metadata stream's bucketization is enabled, causing a bucket to be created and the `consume` callback to be swapped. The `consume` callback of the metadata streams is replaced by a new implementation when the metadata bucketization is activated on the stream. This implementation returns the padded size of the consumed sub-buffer when they could be added to the bucket. When the bucket is flushed, the regular `mmap`-based consumption function is called with the bucket's contents. Known drawbacks =============== This implementation causes the consumer daemon to buffer the whole initial unit of metadata before sending it. In practice, this is not expected to be a problem since the largest metadata files we have seen in real use are a couple of megabytes wide. Beyond the (temporary) memory use, this causes the metadata thread to block while this potentially large chunk of metadata is sent (rather than blocking while sending 4kb at a time). The second point is just a consequence of existing shortcomings of the consumerd; slow IO should not affect other unrelated streams. The fundamental problem is that blocking IO is used and we should switch to non-blocking communication if this is a problem (as is done in the relay daemon). The first point is more problematic given the existing tracer APIs. If the tracer could provide the boundary of a "parseable unit" of metadata, we could send the header of the RELAYD_SEND_METADATA command with that size and send the various metadata packets as they are made available. This would make no difference to the relay daemon as it is not blocking on that socket and will not make the metadata size change visible to the "live server" until it has all been received. This size can't be determined right now since it could exceed the total size of the "metadata" ring buffer. In other words, we can't wait for the production of metadata to complete before starting to consume. Finally, while implementing this fix, I also realized that the computation of the rotation position of the metadata streams is erroneous. The rotation code makes use of the ring-buffer's positions to determine the rotation position. However, since both user space and kernel domains make use of a "cache" behind the ring-buffer, that cached content must be taken into account when computing the metadata stream's rotation position. References ========== [1] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer.c#L3433 [2] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer-timer.c#L312 [3] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer-stream.c#L492 Signed-off-by: Jérémie Galarneau Change-Id: I40ee07e5c344c72d9aae2b9b15dc36c00b21e5fa --- src/common/consumer/Makefile.am | 3 +- src/common/consumer/consumer-stream.c | 66 +++++++- src/common/consumer/consumer-stream.h | 9 ++ src/common/consumer/consumer.c | 1 - src/common/consumer/consumer.h | 10 +- src/common/consumer/metadata-bucket.c | 150 +++++++++++++++++++ src/common/consumer/metadata-bucket.h | 34 +++++ src/common/kernel-consumer/kernel-consumer.c | 100 +++++++++++-- src/common/ust-consumer/ust-consumer.c | 106 ++++++++++--- 9 files changed, 447 insertions(+), 32 deletions(-) create mode 100644 src/common/consumer/metadata-bucket.c create mode 100644 src/common/consumer/metadata-bucket.h diff --git a/src/common/consumer/Makefile.am b/src/common/consumer/Makefile.am index c62831289..33f043cbc 100644 --- a/src/common/consumer/Makefile.am +++ b/src/common/consumer/Makefile.am @@ -5,7 +5,8 @@ noinst_HEADERS = consumer-metadata-cache.h consumer-timer.h \ consumer-testpoint.h libconsumer_la_SOURCES = consumer.c consumer.h consumer-metadata-cache.c \ - consumer-timer.c consumer-stream.c consumer-stream.h + consumer-timer.c consumer-stream.c consumer-stream.h \ + metadata-bucket.c metadata-bucket.h libconsumer_la_LIBADD = \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \ diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index a705f1b18..bf3c2f807 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -31,6 +31,7 @@ #include #include #include +#include #include "consumer-stream.h" @@ -162,7 +163,7 @@ static ssize_t consumer_stream_consume_mmap( subbuffer->info.data.subbuf_size; return lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, &subbuffer->buffer.buffer, padding_size); + stream, &subbuffer->buffer.buffer, padding_size); } static ssize_t consumer_stream_consume_splice( @@ -404,6 +405,10 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream, stream->metadata_version = subbuffer->info.metadata.version; stream->reset_metadata_flag = 1; + if (stream->metadata_bucket) { + metadata_bucket_reset(stream->metadata_bucket); + } + if (stream->read_subbuffer_ops.reset_metadata) { stream->read_subbuffer_ops.reset_metadata(stream); } @@ -728,6 +733,7 @@ void consumer_stream_free(struct lttng_consumer_stream *stream) { assert(stream); + metadata_bucket_destroy(stream->metadata_bucket); call_rcu(&stream->node.head, free_stream_rcu); } @@ -897,4 +903,60 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, error: rcu_read_unlock(); return ret; -} \ No newline at end of file +} + +static ssize_t metadata_bucket_flush( + const struct stream_subbuffer *buffer, void *data) +{ + ssize_t ret; + struct lttng_consumer_stream *stream = data; + + ret = consumer_stream_consume_mmap(NULL, stream, buffer); + if (ret < 0) { + goto end; + } +end: + return ret; +} + +static ssize_t metadata_bucket_consume( + struct lttng_consumer_local_data *unused, + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + ssize_t ret; + enum metadata_bucket_status status; + + status = metadata_bucket_fill(stream->metadata_bucket, subbuffer); + switch (status) { + case METADATA_BUCKET_STATUS_OK: + /* Return consumed size. */ + ret = subbuffer->buffer.buffer.size; + break; + default: + ret = -1; + } + + return ret; +} + +int consumer_stream_enable_metadata_bucketization( + struct lttng_consumer_stream *stream) +{ + int ret = 0; + + assert(stream->metadata_flag); + assert(!stream->metadata_bucket); + assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); + + stream->metadata_bucket = metadata_bucket_create( + metadata_bucket_flush, stream); + if (!stream->metadata_bucket) { + ret = -1; + goto end; + } + + stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume; +end: + return ret; +} diff --git a/src/common/consumer/consumer-stream.h b/src/common/consumer/consumer-stream.h index 52df0c255..1746c0fdd 100644 --- a/src/common/consumer/consumer-stream.h +++ b/src/common/consumer/consumer-stream.h @@ -97,4 +97,13 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, uint64_t session_id); +/* + * Enable metadata bucketization. This must only be enabled if the tracer + * provides a reliable metadata `coherent` flag. + * + * This must be called on initialization before any subbuffer is consumed. + */ +int consumer_stream_enable_metadata_bucketization( + struct lttng_consumer_stream *stream); + #endif /* LTTNG_CONSUMER_STREAM_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index eac37654f..66234b84f 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1423,7 +1423,6 @@ end: * Returns the number of bytes written */ ssize_t lttng_consumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, const struct lttng_buffer_view *buffer, unsigned long padding) diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index e06e163bf..53ab4c577 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -247,6 +247,14 @@ struct stream_subbuffer { unsigned long subbuf_size; unsigned long padded_subbuf_size; uint64_t version; + /* + * Left unset when unsupported. + * + * Indicates that this is the last sub-buffer of + * a series of sub-buffer that makes-up a coherent + * (parseable) unit of metadata. + */ + LTTNG_OPTIONAL(bool) coherent; } metadata; struct { unsigned long subbuf_size; @@ -577,6 +585,7 @@ struct lttng_consumer_stream { on_sleep_cb on_sleep; unlock_cb unlock; } read_subbuffer_ops; + struct metadata_bucket *metadata_bucket; }; /* @@ -863,7 +872,6 @@ struct lttng_consumer_local_data *lttng_consumer_create( int (*update_stream)(uint64_t sessiond_key, uint32_t state)); void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); ssize_t lttng_consumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, const struct lttng_buffer_view *buffer, unsigned long padding); diff --git a/src/common/consumer/metadata-bucket.c b/src/common/consumer/metadata-bucket.c new file mode 100644 index 000000000..1ee5022e5 --- /dev/null +++ b/src/common/consumer/metadata-bucket.c @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2020 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#include "metadata-bucket.h" + +#include +#include +#include +#include +#include + +struct metadata_bucket { + struct lttng_dynamic_buffer content; + struct { + metadata_bucket_flush_cb fn; + void *data; + } flush; + unsigned int buffer_count; +}; + +struct metadata_bucket *metadata_bucket_create( + metadata_bucket_flush_cb flush, void *data) +{ + struct metadata_bucket *bucket; + + bucket = zmalloc(sizeof(typeof(*bucket))); + if (!bucket) { + PERROR("Failed to allocate buffer bucket"); + goto end; + } + + bucket->flush.fn = flush; + bucket->flush.data = data; + lttng_dynamic_buffer_init(&bucket->content); +end: + return bucket; +} + +void metadata_bucket_destroy(struct metadata_bucket *bucket) +{ + if (!bucket) { + return; + } + + if (bucket->content.size > 0) { + WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u", + bucket->content.size, bucket->buffer_count); + } + + lttng_dynamic_buffer_reset(&bucket->content); + free(bucket); +} + +void metadata_bucket_reset(struct metadata_bucket *bucket) +{ + lttng_dynamic_buffer_reset(&bucket->content); + lttng_dynamic_buffer_init(&bucket->content); + bucket->buffer_count = 0; +} + +enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket, + const struct stream_subbuffer *buffer) +{ + ssize_t ret; + struct lttng_buffer_view flushed_view; + struct stream_subbuffer flushed_subbuffer; + enum metadata_bucket_status status; + const bool should_flush = + LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent); + const size_t padding_this_buffer = + buffer->info.metadata.padded_subbuf_size - + buffer->info.metadata.subbuf_size; + size_t flush_size; + + DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s", + buffer->buffer.buffer.size, + buffer->info.metadata.subbuf_size, + buffer->info.metadata.padded_subbuf_size, + buffer->info.metadata.coherent.value ? "true" : "false"); + /* + * If no metadata was accumulated and this buffer should be + * flushed, don't copy it unecessarily; just flush it directly. + */ + if (!should_flush || bucket->buffer_count != 0) { + /* + * Append the _padded_ subbuffer since they are combined + * into a single "virtual" subbuffer that will be + * flushed at once. + * + * This means that some padding will be sent over the + * network, but should not represent a large amount + * of data as incoherent subbuffers are typically + * pretty full. + * + * The padding of the last subbuffer (coherent) added to + * the bucket is not sent, which is what really matters + * from an efficiency point of view. + */ + ret = lttng_dynamic_buffer_append_view( + &bucket->content, &buffer->buffer.buffer); + if (ret) { + status = METADATA_BUCKET_STATUS_ERROR; + goto end; + } + } + + bucket->buffer_count++; + if (!should_flush) { + status = METADATA_BUCKET_STATUS_OK; + goto end; + } + + flushed_view = bucket->content.size != 0 ? + lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) : + lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1); + + /* + * The flush is done with the size of all padded sub-buffers, except + * for the last one which we can safely "trim". The padding of the last + * packet will be reconstructed by the relay daemon. + */ + flush_size = flushed_view.size - padding_this_buffer; + + flushed_subbuffer = (typeof(flushed_subbuffer)) { + .buffer.buffer = flushed_view, + .info.metadata.subbuf_size = flush_size, + .info.metadata.padded_subbuf_size = flushed_view.size, + .info.metadata.version = buffer->info.metadata.version, + .info.metadata.coherent = buffer->info.metadata.coherent, + }; + + DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)", + flushed_view.size, bucket->buffer_count, + bucket->buffer_count > 1 ? "s" : ""); + ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data); + if (ret >= 0) { + status = METADATA_BUCKET_STATUS_OK; + } else { + status = METADATA_BUCKET_STATUS_ERROR; + } + + metadata_bucket_reset(bucket); + +end: + return status; +} diff --git a/src/common/consumer/metadata-bucket.h b/src/common/consumer/metadata-bucket.h new file mode 100644 index 000000000..0355eb3c0 --- /dev/null +++ b/src/common/consumer/metadata-bucket.h @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2020 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#ifndef METADATA_BUCKET_H +#define METADATA_BUCKET_H + +#include + +struct metadata_bucket; + +typedef ssize_t (*metadata_bucket_flush_cb)( + const struct stream_subbuffer *buffer, void *data); + +enum metadata_bucket_status { + METADATA_BUCKET_STATUS_OK, + METADATA_BUCKET_STATUS_ERROR, +}; + +struct metadata_bucket *metadata_bucket_create( + metadata_bucket_flush_cb flush, void *data); + +void metadata_bucket_destroy(struct metadata_bucket *bucket); + +enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket, + const struct stream_subbuffer *buffer); + +void metadata_bucket_reset(struct metadata_bucket *bucket); + +#endif /* METADATA_BUCKET_H */ + diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 2281f5d8e..111d65588 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -45,7 +46,7 @@ #include #include #include -#include +#include #include "kernel-consumer.h" @@ -298,7 +299,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + read_len = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuf_view, padded_len - len); /* @@ -1292,6 +1293,42 @@ end: return ret; } +static +int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + bool coherent; + + ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, + &coherent); + if (ret) { + goto end; + } + + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); + if (ret) { + goto end; + } + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); + + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { + goto end; + } + + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); + DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s", + subbuffer->info.metadata.padded_subbuf_size, + coherent ? "true" : "false"); +end: + return ret; +} + static int put_next_subbuffer(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) @@ -1310,15 +1347,53 @@ int put_next_subbuffer(struct lttng_consumer_stream *stream, return ret; } -static void lttng_kconsumer_set_stream_ops( +static +bool is_get_next_check_metadata_available(int tracer_fd) +{ + return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) != + -ENOTTY; +} + +static +int lttng_kconsumer_set_stream_ops( struct lttng_consumer_stream *stream) { - if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_mmap; - } else { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_splice; + int ret = 0; + + if (stream->metadata_flag && stream->chan->is_live) { + DBG("Attempting to enable metadata bucketization for live consumers"); + if (is_get_next_check_metadata_available(stream->wait_fd)) { + DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached"); + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_metadata_check; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } else { + /* + * The kernel tracer version is too old to indicate + * when the metadata stream has reached a "coherent" + * (parseable) point. + * + * This means that a live viewer may see an incoherent + * sequence of metadata and fail to parse it. + */ + WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream"); + metadata_bucket_destroy(stream->metadata_bucket); + stream->metadata_bucket = NULL; + } + } + + if (!stream->read_subbuffer_ops.get_next_subbuffer) { + if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_mmap; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_splice; + } } if (stream->metadata_flag) { @@ -1334,6 +1409,8 @@ static void lttng_kconsumer_set_stream_ops( } stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +end: + return ret; } int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) @@ -1391,7 +1468,10 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } } - lttng_kconsumer_set_stream_ops(stream); + ret = lttng_kconsumer_set_stream_ops(stream); + if (ret) { + goto error_close_fd; + } /* we return 0 to let the library handle the FD internally */ return 0; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 1cb56f283..b78295c91 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -46,6 +46,7 @@ #include #include #include +#include #include "ust-consumer.h" @@ -1242,7 +1243,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + read_len = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { @@ -2169,7 +2170,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) assert(write_len != 0); if (write_len < 0) { ERR("Writing one metadata packet"); - ret = -1; + ret = write_len; goto end; } stream->ust_metadata_pushed += write_len; @@ -2493,28 +2494,88 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { int ret; + bool cache_empty; + bool got_subbuffer; + bool coherent; + bool buffer_empty; + unsigned long consumed_pos, produced_pos; - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret) { - ret = commit_one_metadata_packet(stream); - if (ret < 0) { - goto end; - } else if (ret == 0) { - /* Not an error, the cache is empty. */ - ret = -ENODATA; - goto end; + do { + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret == 0) { + got_subbuffer = true; + } else { + got_subbuffer = false; + if (ret != -EAGAIN) { + /* Fatal error. */ + goto end; + } } - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret) { - goto end; + /* + * Determine if the cache is empty and ensure that a sub-buffer + * is made available if the cache is not empty. + */ + if (!got_subbuffer) { + ret = commit_one_metadata_packet(stream); + if (ret < 0 && ret != -ENOBUFS) { + goto end; + } else if (ret == 0) { + /* Not an error, the cache is empty. */ + cache_empty = true; + ret = -ENODATA; + goto end; + } else { + cache_empty = false; + } + } else { + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + cache_empty = stream->chan->metadata_cache->max_offset == + stream->ust_metadata_pushed; + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } - } + } while (!got_subbuffer); + /* Populate sub-buffer infos and view. */ ret = get_next_subbuffer_common(stream, subbuffer); if (ret) { goto end; } + + ret = lttng_ustconsumer_take_snapshot(stream); + if (ret < 0) { + /* + * -EAGAIN is not expected since we got a sub-buffer and haven't + * pushed the consumption position yet (on put_next). + */ + PERROR("Failed to take a snapshot of metadata buffer positions"); + goto end; + } + + ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret) { + PERROR("Failed to get metadata consumed position"); + goto end; + } + + ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret) { + PERROR("Failed to get metadata produced position"); + goto end; + } + + /* Last sub-buffer of the ring buffer ? */ + buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos; + + /* + * The sessiond registry lock ensures that coherent units of metadata + * are pushed to the consumer daemon at once. Hence, if a sub-buffer is + * acquired, the cache is empty, and it is the only available sub-buffer + * available, it is safe to assume that it is "coherent". + */ + coherent = got_subbuffer && cache_empty && buffer_empty; + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); end: return ret; } @@ -2534,9 +2595,11 @@ static int signal_metadata(struct lttng_consumer_stream *stream, return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } -static void lttng_ustconsumer_set_stream_ops( +static int lttng_ustconsumer_set_stream_ops( struct lttng_consumer_stream *stream) { + int ret = 0; + stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; if (stream->metadata_flag) { stream->read_subbuffer_ops.get_next_subbuffer = @@ -2545,7 +2608,14 @@ static void lttng_ustconsumer_set_stream_ops( extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = metadata_stream_reset_cache; - stream->read_subbuffer_ops.on_sleep = signal_metadata; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.on_sleep = signal_metadata; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } } else { stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer; @@ -2559,6 +2629,8 @@ static void lttng_ustconsumer_set_stream_ops( } stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +end: + return ret; } /* -- 2.34.1 From 82aae70e29164db14e37da5686b882ac70a3414f Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Wed, 27 May 2020 11:27:26 -0400 Subject: [PATCH 10/16] Fix: incorrect specifier %lu used with size_t argument MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Fixes the following warning on 32-bit targets: libtool: compile: gcc -DHAVE_CONFIG_H -I../../../include -I../../../include -I../../../src -include config.h -I/build/include -I/home/jenkins/workspace/lttng-tools_master_portbuild/arch/armhf/babeltrace_version/stable-1.5/build/std/conf/std/liburcu_version/master/test_type/base/deps/build/include -Wall -Wno-incomplete-setjmp-declaration -Wdiscarded-qualifiers -Wmissing-declarations -Wmissing-prototypes -Wmissing-parameter-type -fno-strict-aliasing -pthread -g -O2 -MT consumer-stream.lo -MD -MP -MF .deps/consumer-stream.Tpo -c consumer-stream.c -fPIC -DPIC -o .libs/consumer-stream.o In file included from ../../../src/common/common.h:12:0, from consumer.c:25: consumer.c: In function ‘lttng_consumer_on_read_subbuffer_mmap’: ../../../src/common/error.h:161:35: warning: format ‘%lu’ expects argument of type ‘long unsigned int’, but argument 7 has type ‘size_t {aka unsigned int}’ [-Wformat=] #define DBG(fmt, args...) _ERRMSG("DEBUG1", PRINT_DBG, fmt, ## args) ^ ../../../src/common/error.h:136:51: note: in definition of macro ‘__lttng_print’ fprintf((type) == PRINT_MSG ? stdout : stderr, fmt, ## args); \ ^~~ ../../../src/common/error.h:161:27: note: in expansion of macro ‘_ERRMSG’ #define DBG(fmt, args...) _ERRMSG("DEBUG1", PRINT_DBG, fmt, ## args) ^~~~~~~ consumer.c:1688:2: note: in expansion of macro ‘DBG’ DBG("Consumer mmap write() ret %zd (len %lu)", ret, write_len); ^~~ Signed-off-by: Jérémie Galarneau Change-Id: Id9a571d8e94105428833baa053c6463b91484a03 --- src/common/consumer/consumer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 66234b84f..bae4c294b 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1544,7 +1544,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * receive a ret value that is bigger than len. */ ret = lttng_write(outfd, buffer->data, write_len); - DBG("Consumer mmap write() ret %zd (len %lu)", ret, write_len); + DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len); if (ret < 0 || ((size_t) ret != write_len)) { /* * Report error to caller if nothing was written else at least send the -- 2.34.1 From 3910d1eac65d9b89d2b9e5bc2a07f6bdca9c088a Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Wed, 17 Jun 2020 12:59:24 -0400 Subject: [PATCH 11/16] Fix: consumerd: user space metadata not regenerated MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Observed Issue ============== The LTTng-IVC tests fail on the `regenerate metadata` tests which essentially: - Setups a user space session - Enables events - Traces an application - Stops tracing - Validates the trace - Truncates the metadata file (empties it) - Starts tracing - Regenerates the metadata - Stops the session - Validates the trace The last trace validation step fails on an empty file (locally) or a garbled file (remote). The in-tree tests did no catch any of this since they essentially don't test much. They verify that the command works (returns 0) but do not validate any of its effects. The issue was bisected down to: commit 6f9449c22eef59294cf1e1dc3610a5cbf14baec0 (HEAD) Author: Jérémie Galarneau Date: Sun May 10 18:00:26 2020 -0400 consumerd: refactor: split read_subbuf into sub-operations [...] Cause ===== The commit that introduced the issue refactored the sub-buffer consumption loop to eliminate code duplications between the user space and kernel consumer daemons. In doing so, it eleminated a metadata version check from the consumption path. The consumption of a metadata sub-buffer follows those relevant high-level steps: - `get` the sub-buffer - /!\ user space specific /!\ - if the `get` fails, attempt to flush the metadata cache's contents to the ring-buffer - populate `stream_subbuffer` properties (size, version, etc.) - check the sub-buffer's version against the last known metadata version (pre-consume step) - if they don't match, a metadata regeneration occurred: reset the metadata consumed position - consume (actual write/send) - `put` sub-buffer [...] As shown above, the user space consumer must manage the flushing of the metadata cache explicitly as opposed to the kernel domain for which the tracer performs the flushing implicitly through the `get` operation. When the user space consumer encounters a `get` failure, it checks if all the metadata cache was flushed (consumed position != cache size), and flushes any remaining contents. However, the metadata version could have changed and yielded an identical cache size: a regeneration without any new metadata will yield the same cache size. Since 6f9449c22, the metadata version check is only performed after a successful `get`. This means that after a regeneration, `get` never succeeds (there is seemingly nothing to consume), and the metadata version check is never performed. Therefore, the metadata stream is never put in the `reset` mode, effectively not regenerating the data. Note that producing new metadata (e.g. a newly registering app announcing new events) would work around the problem here. Solution ======== Add a metadata version check when failing to `get` a metadata sub-buffer. This is done in `commit_one_metadata_packet()` when the cache size is seen to be equal to the consumed position. When this occurs, `consumer_stream_metadata_set_version()`, a new consumer stream method, is invoked which sets the new metadata version, sets the `reset` flag, and discards any previously bucketized metadata. The metadata cache's consumed position is also reset, allowing the cache flush to take place. `metadata_stream_reset_cache()` is renamed to `metadata_stream_reset_cache_consumed_position()` since its name is misleading and since it is used as part of the fix. Know drawbacks ============== None. Change-Id: I3b933c8293f409f860074bd49bebd8d1248b6341 Signed-off-by: Jérémie Galarneau Reported-by: Jonathan Rajotte --- src/common/consumer/consumer-stream.c | 20 +++++++---- src/common/consumer/consumer-stream.h | 10 ++++++ src/common/ust-consumer/ust-consumer.c | 48 +++++++++++++++++++++----- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index bf3c2f807..c9a55505b 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -402,12 +402,8 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream, } DBG("New metadata version detected"); - stream->metadata_version = subbuffer->info.metadata.version; - stream->reset_metadata_flag = 1; - - if (stream->metadata_bucket) { - metadata_bucket_reset(stream->metadata_bucket); - } + consumer_stream_metadata_set_version(stream, + subbuffer->info.metadata.version); if (stream->read_subbuffer_ops.reset_metadata) { stream->read_subbuffer_ops.reset_metadata(stream); @@ -960,3 +956,15 @@ int consumer_stream_enable_metadata_bucketization( end: return ret; } + +void consumer_stream_metadata_set_version( + struct lttng_consumer_stream *stream, uint64_t new_version) +{ + assert(new_version > stream->metadata_version); + stream->metadata_version = new_version; + stream->reset_metadata_flag = 1; + + if (stream->metadata_bucket) { + metadata_bucket_reset(stream->metadata_bucket); + } +} diff --git a/src/common/consumer/consumer-stream.h b/src/common/consumer/consumer-stream.h index 1746c0fdd..506b166b0 100644 --- a/src/common/consumer/consumer-stream.h +++ b/src/common/consumer/consumer-stream.h @@ -106,4 +106,14 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, int consumer_stream_enable_metadata_bucketization( struct lttng_consumer_stream *stream); +/* + * Set the version of a metadata stream (i.e. following a metadata + * regeneration). + * + * Changing the version of a metadata stream will cause any bucketized metadata + * to be discarded and will mark the metadata stream for future `reset`. + */ +void consumer_stream_metadata_set_version( + struct lttng_consumer_stream *stream, uint64_t new_version); + #endif /* LTTNG_CONSUMER_STREAM_H */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index b78295c91..1bde820cd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2135,13 +2135,12 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) } static -void metadata_stream_reset_cache(struct lttng_consumer_stream *stream) +void metadata_stream_reset_cache_consumed_position( + struct lttng_consumer_stream *stream) { DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id); stream->ust_metadata_pushed = 0; - stream->metadata_version = stream->chan->metadata_cache->version; - stream->reset_metadata_flag = 1; } /* @@ -2157,10 +2156,41 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->max_offset - == stream->ust_metadata_pushed) { - ret = 0; - goto end; + if (stream->chan->metadata_cache->max_offset == + stream->ust_metadata_pushed) { + /* + * In the context of a user space metadata channel, a + * change in version can be detected in two ways: + * 1) During the pre-consume of the `read_subbuffer` loop, + * 2) When populating the metadata ring buffer (i.e. here). + * + * This function is invoked when there is no metadata + * available in the ring-buffer. If all data was consumed + * up to the size of the metadata cache, there is no metadata + * to insert in the ring-buffer. + * + * However, the metadata version could still have changed (a + * regeneration without any new data will yield the same cache + * size). + * + * The cache's version is checked for a version change and the + * consumed position is reset if one occurred. + * + * This check is only necessary for the user space domain as + * it has to manage the cache explicitly. If this reset was not + * performed, no metadata would be consumed (and no reset would + * occur as part of the pre-consume) until the metadata size + * exceeded the cache size. + */ + if (stream->metadata_version != + stream->chan->metadata_cache->version) { + metadata_stream_reset_cache_consumed_position(stream); + consumer_stream_metadata_set_version(stream, + stream->chan->metadata_cache->version); + } else { + ret = 0; + goto end; + } } write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, @@ -2363,7 +2393,7 @@ static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - subbuf->info.metadata.version = stream->chan->metadata_cache->version; + subbuf->info.metadata.version = stream->metadata_version; end: return ret; @@ -2607,7 +2637,7 @@ static int lttng_ustconsumer_set_stream_ops( stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = - metadata_stream_reset_cache; + metadata_stream_reset_cache_consumed_position; if (stream->chan->is_live) { stream->read_subbuffer_ops.on_sleep = signal_metadata; ret = consumer_stream_enable_metadata_bucketization( -- 2.34.1 From 518b286638451a51c543388ad0ad7fb2b0ace1f7 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Wed, 17 Jun 2020 15:55:36 -0400 Subject: [PATCH 12/16] Fix: invalid discarded events on start/stop without event production MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Observed issue ============== On consecutive start/stop command sequence the reported discarded event count is N * CPU, where N is the number of start/stop pair executed. Note that no event generation occurred between each start/stop pair. lttng start lttng stop Tracing stopped for session auto-20200616-094338 lttng start lttng stop Waiting for data availability Warning: 4 events were discarded, please refer to the documentation on channel configuration. Tracing stopped for session auto-20200616-094338 lttng start lttng stop Waiting for data availability Warning: 8 events were discarded, please refer to the documentation on channel configuration. Tracing stopped for session auto-20200616-094338 The issue was bisected down to: commit 6f9449c22eef59294cf1e1dc3610a5cbf14baec0 (HEAD) Author: Jérémie Galarneau Date: Sun May 10 18:00:26 2020 -0400 consumerd: refactor: split read_subbuf into sub-operations [...] Cause ===== The discarded event local variable, in `consumer_stream_update_stats()` is initialized with the subbuffer sequence count instead of the subbuffer discarded event count. Solution ======== Use the subbuffer discarded event count to initialized the variable. Known drawbacks ========= None Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau Change-Id: I5ff213d0464cdb591b550f6e610bf15085b18888 --- src/common/consumer/consumer-stream.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index c9a55505b..aa783423d 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -79,8 +79,7 @@ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, { int ret = 0; uint64_t sequence_number; - const uint64_t discarded_events = - LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number); + const uint64_t discarded_events = subbuf->info.data.events_discarded; if (!subbuf->info.data.sequence_number.is_set) { /* Command not supported by the tracer. */ -- 2.34.1 From 558cd39fb7504ea3e52fdba116718edb8018c59d Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Wed, 17 Jun 2020 18:27:52 -0400 Subject: [PATCH 13/16] consumerd: on_sleep not called on stream when no data is available MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The `on_sleep` stream operation is not invoked when a stream's `get` operation returns ENODATA (no data available). Since this is not an error, the normal steps of the consumption loops should be assumed. Not marked as a fix as this is not problematic right now. However, it could prove misleading in the future. Signed-off-by: Jérémie Galarneau Change-Id: I0812e3af4c967390ebba4128781787abf45c76a1 Cherry-pick difference: merge conflict on rotation. Simply deleted the rotation related code. Change-Id: Ic840361477d77eae387573a1bb0949636abe9b12 --- src/common/consumer/consumer.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index bae4c294b..8a2a0f9c9 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3165,6 +3165,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, if (ret == -ENODATA) { /* Not an error. */ ret = 0; + goto sleep_stream; } goto end; } @@ -3209,6 +3210,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, } } +stream_sleep: if (stream->read_subbuffer_ops.on_sleep) { stream->read_subbuffer_ops.on_sleep(stream, ctx); } -- 2.34.1 From 20f03252e998b92e5e3a154249987eb7e2f1f359 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Wed, 17 Jun 2020 18:54:09 -0400 Subject: [PATCH 14/16] Build fix: consumerd misnamed label MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit `sleep_stream` label was changed from `stream_sleep` as part of a fixup/rebase which didn't make it into the master branch. Signed-off-by: Jérémie Galarneau Change-Id: Ifd257e2b6d1f522d018cf0284a89f49a92b12b02 --- src/common/consumer/consumer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 8a2a0f9c9..4aee51813 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3210,7 +3210,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, } } -stream_sleep: +sleep_stream: if (stream->read_subbuffer_ops.on_sleep) { stream->read_subbuffer_ops.on_sleep(stream, ctx); } -- 2.34.1 From 611cce88797fd71f50d9bcb6e2921f20b3445ca5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Wed, 17 Jun 2020 19:13:50 -0400 Subject: [PATCH 15/16] Fix: consumerd: uninitialized written_bytes on no-data sleep MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit e66d26f51 introduces a jump to a label which causes `written_bytes` (the return value) to not be initialized. written_bytes may be used uninitialized in this function [-Wmaybe-uninitialized] Signed-off-by: Jérémie Galarneau Change-Id: I72c5c07298093f27fa72b72cb157ce4eedb81adb --- src/common/consumer/consumer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 4aee51813..688492e43 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3146,7 +3146,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx, bool locked_by_caller) { - ssize_t ret, written_bytes; + ssize_t ret, written_bytes = 0; struct stream_subbuffer subbuffer = {}; if (!locked_by_caller) { -- 2.34.1 From c40797353dbccedfbc50801023385bc6a3e4dbea Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 26 May 2020 14:33:54 -0400 Subject: [PATCH 16/16] EfficiOS backport 2.9 revision 8 Signed-off-by: Jonathan Rajotte Change-Id: Ic742e3107d3aad31ebe180b81faffb31637ce4eb --- version/extra_version_description | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/extra_version_description b/version/extra_version_description index ca25f9aba..261ed5e85 100644 --- a/version/extra_version_description +++ b/version/extra_version_description @@ -1 +1 @@ -EfficiOS Revision 7 +EfficiOS Revision 8 -- 2.34.1