Commit | Line | Data |
---|---|---|
f5ba75b4 JG |
1 | /* |
2 | * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
3 | * | |
4 | * SPDX-License-Identifier: GPL-2.0-only | |
5 | * | |
6 | */ | |
7 | ||
7532fa3b | 8 | #include "metadata-bucket.hpp" |
f5ba75b4 | 9 | |
7532fa3b MD |
10 | #include <common/buffer-view.hpp> |
11 | #include <common/consumer/consumer.hpp> | |
12 | #include <common/dynamic-buffer.hpp> | |
13 | #include <common/macros.hpp> | |
14 | #include <common/error.hpp> | |
f5ba75b4 JG |
15 | |
16 | struct metadata_bucket { | |
17 | struct lttng_dynamic_buffer content; | |
18 | struct { | |
19 | metadata_bucket_flush_cb fn; | |
20 | void *data; | |
21 | } flush; | |
22 | unsigned int buffer_count; | |
23 | }; | |
24 | ||
25 | struct metadata_bucket *metadata_bucket_create( | |
26 | metadata_bucket_flush_cb flush, void *data) | |
27 | { | |
28 | struct metadata_bucket *bucket; | |
29 | ||
97535efa | 30 | bucket = (metadata_bucket *) zmalloc(sizeof(typeof(*bucket))); |
f5ba75b4 JG |
31 | if (!bucket) { |
32 | PERROR("Failed to allocate buffer bucket"); | |
33 | goto end; | |
34 | } | |
35 | ||
36 | bucket->flush.fn = flush; | |
37 | bucket->flush.data = data; | |
38 | lttng_dynamic_buffer_init(&bucket->content); | |
39 | end: | |
40 | return bucket; | |
41 | } | |
42 | ||
43 | void metadata_bucket_destroy(struct metadata_bucket *bucket) | |
44 | { | |
45 | if (!bucket) { | |
46 | return; | |
47 | } | |
48 | ||
49 | if (bucket->content.size > 0) { | |
50 | WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u", | |
51 | bucket->content.size, bucket->buffer_count); | |
52 | } | |
53 | ||
54 | lttng_dynamic_buffer_reset(&bucket->content); | |
55 | free(bucket); | |
56 | } | |
57 | ||
58 | void metadata_bucket_reset(struct metadata_bucket *bucket) | |
59 | { | |
60 | lttng_dynamic_buffer_reset(&bucket->content); | |
61 | lttng_dynamic_buffer_init(&bucket->content); | |
62 | bucket->buffer_count = 0; | |
63 | } | |
64 | ||
7532fa3b MD |
65 | struct metadata_packet_header { |
66 | uint32_t magic; /* 0x75D11D57 */ | |
67 | uint8_t uuid[16]; /* Unique Universal Identifier */ | |
68 | uint32_t checksum; /* 0 if unused */ | |
69 | uint32_t content_size; /* in bits */ | |
70 | uint32_t packet_size; /* in bits */ | |
71 | uint8_t compression_scheme; /* 0 if unused */ | |
72 | uint8_t encryption_scheme; /* 0 if unused */ | |
73 | uint8_t checksum_scheme; /* 0 if unused */ | |
74 | uint8_t major; /* CTF spec major version number */ | |
75 | uint8_t minor; /* CTF spec minor version number */ | |
76 | uint8_t header_end[0]; | |
77 | }; | |
78 | ||
79 | static size_t metadata_length(void) | |
80 | { | |
81 | return offsetof(struct metadata_packet_header, header_end); | |
82 | } | |
83 | ||
f5ba75b4 JG |
84 | enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket, |
85 | const struct stream_subbuffer *buffer) | |
86 | { | |
87 | ssize_t ret; | |
88 | struct lttng_buffer_view flushed_view; | |
89 | struct stream_subbuffer flushed_subbuffer; | |
90 | enum metadata_bucket_status status; | |
91 | const bool should_flush = | |
92 | LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent); | |
f5ba75b4 JG |
93 | |
94 | DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s", | |
95 | buffer->buffer.buffer.size, | |
96 | buffer->info.metadata.subbuf_size, | |
97 | buffer->info.metadata.padded_subbuf_size, | |
98 | buffer->info.metadata.coherent.value ? "true" : "false"); | |
99 | /* | |
100 | * If no metadata was accumulated and this buffer should be | |
101 | * flushed, don't copy it unecessarily; just flush it directly. | |
102 | */ | |
103 | if (!should_flush || bucket->buffer_count != 0) { | |
7532fa3b MD |
104 | struct lttng_buffer_view append_view; |
105 | ||
f5ba75b4 | 106 | /* |
7532fa3b | 107 | * Append the subbuffer since they are combined |
f5ba75b4 JG |
108 | * into a single "virtual" subbuffer that will be |
109 | * flushed at once. | |
110 | * | |
7532fa3b MD |
111 | * Peel off metadata header and padding, only preserve |
112 | * the json-seq metadata. | |
f5ba75b4 | 113 | */ |
7532fa3b MD |
114 | append_view = lttng_buffer_view_from_view(&buffer->buffer.buffer, metadata_length(), |
115 | buffer->info.metadata.subbuf_size - metadata_length()); | |
f5ba75b4 | 116 | ret = lttng_dynamic_buffer_append_view( |
7532fa3b | 117 | &bucket->content, &append_view); |
f5ba75b4 JG |
118 | if (ret) { |
119 | status = METADATA_BUCKET_STATUS_ERROR; | |
120 | goto end; | |
121 | } | |
122 | } | |
123 | ||
124 | bucket->buffer_count++; | |
125 | if (!should_flush) { | |
126 | status = METADATA_BUCKET_STATUS_OK; | |
127 | goto end; | |
128 | } | |
129 | ||
130 | flushed_view = bucket->content.size != 0 ? | |
131 | lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) : | |
7532fa3b MD |
132 | lttng_buffer_view_from_view(&buffer->buffer.buffer, metadata_length(), |
133 | buffer->info.metadata.subbuf_size - metadata_length()); | |
f5ba75b4 JG |
134 | |
135 | flushed_subbuffer = (typeof(flushed_subbuffer)) { | |
97535efa SM |
136 | .buffer = { |
137 | .buffer = flushed_view, | |
138 | }, | |
139 | .info = { | |
140 | .metadata = { | |
7532fa3b | 141 | .subbuf_size = flushed_view.size, |
97535efa SM |
142 | .padded_subbuf_size = flushed_view.size, |
143 | .version = buffer->info.metadata.version, | |
144 | .coherent = buffer->info.metadata.coherent, | |
145 | }, | |
146 | }, | |
f5ba75b4 JG |
147 | }; |
148 | ||
149 | DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)", | |
150 | flushed_view.size, bucket->buffer_count, | |
151 | bucket->buffer_count > 1 ? "s" : ""); | |
152 | ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data); | |
153 | if (ret >= 0) { | |
154 | status = METADATA_BUCKET_STATUS_OK; | |
155 | } else { | |
156 | status = METADATA_BUCKET_STATUS_ERROR; | |
157 | } | |
158 | ||
159 | metadata_bucket_reset(bucket); | |
160 | ||
161 | end: | |
162 | return status; | |
163 | } |