2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
8 #include "metadata-bucket.hpp"
10 #include <common/buffer-view.hpp>
11 #include <common/consumer/consumer.hpp>
12 #include <common/dynamic-buffer.hpp>
13 #include <common/macros.hpp>
14 #include <common/error.hpp>
16 struct metadata_bucket
{
17 struct lttng_dynamic_buffer content
;
19 metadata_bucket_flush_cb fn
;
22 unsigned int buffer_count
;
25 struct metadata_bucket
*metadata_bucket_create(
26 metadata_bucket_flush_cb flush
, void *data
)
28 struct metadata_bucket
*bucket
;
30 bucket
= (metadata_bucket
*) zmalloc(sizeof(typeof(*bucket
)));
32 PERROR("Failed to allocate buffer bucket");
36 bucket
->flush
.fn
= flush
;
37 bucket
->flush
.data
= data
;
38 lttng_dynamic_buffer_init(&bucket
->content
);
43 void metadata_bucket_destroy(struct metadata_bucket
*bucket
)
49 if (bucket
->content
.size
> 0) {
50 WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u",
51 bucket
->content
.size
, bucket
->buffer_count
);
54 lttng_dynamic_buffer_reset(&bucket
->content
);
58 void metadata_bucket_reset(struct metadata_bucket
*bucket
)
60 lttng_dynamic_buffer_reset(&bucket
->content
);
61 lttng_dynamic_buffer_init(&bucket
->content
);
62 bucket
->buffer_count
= 0;
65 struct metadata_packet_header
{
66 uint32_t magic
; /* 0x75D11D57 */
67 uint8_t uuid
[16]; /* Unique Universal Identifier */
68 uint32_t checksum
; /* 0 if unused */
69 uint32_t content_size
; /* in bits */
70 uint32_t packet_size
; /* in bits */
71 uint8_t compression_scheme
; /* 0 if unused */
72 uint8_t encryption_scheme
; /* 0 if unused */
73 uint8_t checksum_scheme
; /* 0 if unused */
74 uint8_t major
; /* CTF spec major version number */
75 uint8_t minor
; /* CTF spec minor version number */
76 uint8_t header_end
[0];
79 static size_t metadata_length(void)
81 return offsetof(struct metadata_packet_header
, header_end
);
84 enum metadata_bucket_status
metadata_bucket_fill(struct metadata_bucket
*bucket
,
85 const struct stream_subbuffer
*buffer
)
88 struct lttng_buffer_view flushed_view
;
89 struct stream_subbuffer flushed_subbuffer
;
90 enum metadata_bucket_status status
;
91 const bool should_flush
=
92 LTTNG_OPTIONAL_GET(buffer
->info
.metadata
.coherent
);
94 DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s",
95 buffer
->buffer
.buffer
.size
,
96 buffer
->info
.metadata
.subbuf_size
,
97 buffer
->info
.metadata
.padded_subbuf_size
,
98 buffer
->info
.metadata
.coherent
.value
? "true" : "false");
100 * If no metadata was accumulated and this buffer should be
101 * flushed, don't copy it unecessarily; just flush it directly.
103 if (!should_flush
|| bucket
->buffer_count
!= 0) {
104 struct lttng_buffer_view append_view
;
107 * Append the subbuffer since they are combined
108 * into a single "virtual" subbuffer that will be
111 * Peel off metadata header and padding, only preserve
112 * the json-seq metadata.
114 append_view
= lttng_buffer_view_from_view(&buffer
->buffer
.buffer
, metadata_length(),
115 buffer
->info
.metadata
.subbuf_size
- metadata_length());
116 ret
= lttng_dynamic_buffer_append_view(
117 &bucket
->content
, &append_view
);
119 status
= METADATA_BUCKET_STATUS_ERROR
;
124 bucket
->buffer_count
++;
126 status
= METADATA_BUCKET_STATUS_OK
;
130 flushed_view
= bucket
->content
.size
!= 0 ?
131 lttng_buffer_view_from_dynamic_buffer(&bucket
->content
, 0, -1) :
132 lttng_buffer_view_from_view(&buffer
->buffer
.buffer
, metadata_length(),
133 buffer
->info
.metadata
.subbuf_size
- metadata_length());
135 flushed_subbuffer
= (typeof(flushed_subbuffer
)) {
137 .buffer
= flushed_view
,
141 .subbuf_size
= flushed_view
.size
,
142 .padded_subbuf_size
= flushed_view
.size
,
143 .version
= buffer
->info
.metadata
.version
,
144 .coherent
= buffer
->info
.metadata
.coherent
,
149 DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)",
150 flushed_view
.size
, bucket
->buffer_count
,
151 bucket
->buffer_count
> 1 ? "s" : "");
152 ret
= bucket
->flush
.fn(&flushed_subbuffer
, bucket
->flush
.data
);
154 status
= METADATA_BUCKET_STATUS_OK
;
156 status
= METADATA_BUCKET_STATUS_ERROR
;
159 metadata_bucket_reset(bucket
);