Commit | Line | Data |
---|---|---|
958f7d11 PP |
1 | /* |
2 | * Copyright 2017 Philippe Proulx <pproulx@efficios.com> | |
3 | * | |
4 | * Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 | * of this software and associated documentation files (the "Software"), to deal | |
6 | * in the Software without restriction, including without limitation the rights | |
7 | * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 | * copies of the Software, and to permit persons to whom the Software is | |
9 | * furnished to do so, subject to the following conditions: | |
10 | * | |
11 | * The above copyright notice and this permission notice shall be included in | |
12 | * all copies or substantial portions of the Software. | |
13 | * | |
14 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 | * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
20 | * SOFTWARE. | |
21 | */ | |
22 | ||
d9693c64 | 23 | #define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp) |
f5abbab4 | 24 | #define BT_LOG_OUTPUT_LEVEL (muxer_comp->log_level) |
b03487ab | 25 | #define BT_LOG_TAG "PLUGIN/FLT.UTILS.MUXER" |
3fa1b6a3 | 26 | #include "logging/comp-logging.h" |
318fe670 | 27 | |
85e7137b | 28 | #include "common/macros.h" |
d126826c | 29 | #include "common/uuid.h" |
71c5da58 | 30 | #include <babeltrace2/babeltrace.h> |
958f7d11 | 31 | #include <glib.h> |
c55a9f58 | 32 | #include <stdbool.h> |
318fe670 | 33 | #include <inttypes.h> |
57952005 MJ |
34 | #include "common/assert.h" |
35 | #include "common/common.h" | |
0fbb9a9f | 36 | #include <stdlib.h> |
282c8cd0 | 37 | #include <string.h> |
958f7d11 | 38 | |
9240924b FD |
39 | #include "plugins/common/muxing/muxing.h" |
40 | ||
3e3ea13e FD |
41 | #include "muxer.h" |
42 | ||
958f7d11 | 43 | struct muxer_comp { |
d9693c64 PP |
44 | /* Weak refs */ |
45 | bt_self_component_filter *self_comp_flt; | |
46 | bt_self_component *self_comp; | |
834e9996 | 47 | |
958f7d11 PP |
48 | unsigned int next_port_num; |
49 | size_t available_input_ports; | |
b09a5592 | 50 | bool initializing_muxer_msg_iter; |
f5abbab4 | 51 | bt_logging_level log_level; |
958f7d11 PP |
52 | }; |
53 | ||
b09a5592 | 54 | struct muxer_upstream_msg_iter { |
f5abbab4 PP |
55 | struct muxer_comp *muxer_comp; |
56 | ||
ab11110e | 57 | /* Owned by this, NULL if ended */ |
b09a5592 | 58 | bt_self_component_port_input_message_iterator *msg_iter; |
958f7d11 | 59 | |
b09a5592 PP |
60 | /* Contains `const bt_message *`, owned by this */ |
61 | GQueue *msgs; | |
958f7d11 PP |
62 | }; |
63 | ||
b09a5592 PP |
64 | enum muxer_msg_iter_clock_class_expectation { |
65 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0, | |
37911c11 | 66 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE, |
b09a5592 PP |
67 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE, |
68 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID, | |
69 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID, | |
282c8cd0 PP |
70 | }; |
71 | ||
b09a5592 | 72 | struct muxer_msg_iter { |
f5abbab4 PP |
73 | struct muxer_comp *muxer_comp; |
74 | ||
692f1a01 PP |
75 | /* Weak */ |
76 | bt_self_message_iterator *self_msg_iter; | |
77 | ||
ab11110e | 78 | /* |
b09a5592 | 79 | * Array of struct muxer_upstream_msg_iter * (owned by this). |
ab11110e PP |
80 | * |
81 | * NOTE: This array is searched in linearly to find the youngest | |
b09a5592 | 82 | * current message. Keep this until benchmarks confirm that |
ab11110e PP |
83 | * another data structure is faster than this for our typical |
84 | * use cases. | |
85 | */ | |
a71ed05c PP |
86 | GPtrArray *active_muxer_upstream_msg_iters; |
87 | ||
88 | /* | |
89 | * Array of struct muxer_upstream_msg_iter * (owned by this). | |
90 | * | |
91 | * We move ended message iterators from | |
92 | * `active_muxer_upstream_msg_iters` to this array so as to be | |
93 | * able to restore them when seeking. | |
94 | */ | |
95 | GPtrArray *ended_muxer_upstream_msg_iters; | |
958f7d11 | 96 | |
b09a5592 | 97 | /* Last time returned in a message */ |
958f7d11 | 98 | int64_t last_returned_ts_ns; |
282c8cd0 PP |
99 | |
100 | /* Clock class expectation state */ | |
b09a5592 | 101 | enum muxer_msg_iter_clock_class_expectation clock_class_expectation; |
282c8cd0 PP |
102 | |
103 | /* | |
104 | * Expected clock class UUID, only valid when | |
105 | * clock_class_expectation is | |
b09a5592 | 106 | * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID. |
282c8cd0 | 107 | */ |
d126826c | 108 | bt_uuid_t expected_clock_class_uuid; |
958f7d11 PP |
109 | }; |
110 | ||
a71ed05c PP |
111 | static |
112 | void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter) | |
113 | { | |
114 | const bt_message *msg; | |
115 | ||
116 | while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) { | |
117 | bt_message_put_ref(msg); | |
118 | } | |
119 | } | |
120 | ||
ab11110e | 121 | static |
b09a5592 PP |
122 | void destroy_muxer_upstream_msg_iter( |
123 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) | |
ab11110e | 124 | { |
f5abbab4 PP |
125 | struct muxer_comp *muxer_comp; |
126 | ||
b09a5592 | 127 | if (!muxer_upstream_msg_iter) { |
ab11110e PP |
128 | return; |
129 | } | |
130 | ||
f5abbab4 | 131 | muxer_comp = muxer_upstream_msg_iter->muxer_comp; |
d9693c64 | 132 | BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: " |
b09a5592 PP |
133 | "addr=%p, msg-iter-addr=%p, queue-len=%u", |
134 | muxer_upstream_msg_iter, | |
135 | muxer_upstream_msg_iter->msg_iter, | |
136 | muxer_upstream_msg_iter->msgs->length); | |
a71ed05c PP |
137 | bt_self_component_port_input_message_iterator_put_ref( |
138 | muxer_upstream_msg_iter->msg_iter); | |
3fd7b79d | 139 | |
b09a5592 | 140 | if (muxer_upstream_msg_iter->msgs) { |
a71ed05c | 141 | empty_message_queue(muxer_upstream_msg_iter); |
b09a5592 | 142 | g_queue_free(muxer_upstream_msg_iter->msgs); |
3fd7b79d PP |
143 | } |
144 | ||
b09a5592 | 145 | g_free(muxer_upstream_msg_iter); |
ab11110e PP |
146 | } |
147 | ||
958f7d11 | 148 | static |
ae644e59 | 149 | int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, |
b09a5592 | 150 | bt_self_component_port_input_message_iterator *self_msg_iter) |
958f7d11 | 151 | { |
ae644e59 | 152 | int ret = 0; |
b09a5592 PP |
153 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = |
154 | g_new0(struct muxer_upstream_msg_iter, 1); | |
f5abbab4 | 155 | struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; |
958f7d11 | 156 | |
b09a5592 | 157 | if (!muxer_upstream_msg_iter) { |
d9693c64 | 158 | BT_COMP_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper."); |
11603bce | 159 | goto error; |
958f7d11 PP |
160 | } |
161 | ||
f5abbab4 | 162 | muxer_upstream_msg_iter->muxer_comp = muxer_comp; |
b09a5592 PP |
163 | muxer_upstream_msg_iter->msg_iter = self_msg_iter; |
164 | bt_self_component_port_input_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); | |
165 | muxer_upstream_msg_iter->msgs = g_queue_new(); | |
166 | if (!muxer_upstream_msg_iter->msgs) { | |
d9693c64 | 167 | BT_COMP_LOGE_STR("Failed to allocate a GQueue."); |
11603bce | 168 | goto error; |
3fd7b79d PP |
169 | } |
170 | ||
a71ed05c | 171 | g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, |
b09a5592 | 172 | muxer_upstream_msg_iter); |
d9693c64 | 173 | BT_COMP_LOGD("Added muxer's upstream message iterator wrapper: " |
b09a5592 PP |
174 | "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p", |
175 | muxer_upstream_msg_iter, muxer_msg_iter, | |
176 | self_msg_iter); | |
958f7d11 | 177 | |
11603bce FD |
178 | goto end; |
179 | ||
180 | error: | |
181 | g_free(muxer_upstream_msg_iter); | |
ae644e59 | 182 | ret = -1; |
11603bce | 183 | |
958f7d11 | 184 | end: |
ae644e59 | 185 | return ret; |
958f7d11 PP |
186 | } |
187 | ||
958f7d11 | 188 | static |
fb25b9e3 | 189 | bt_self_component_add_port_status add_available_input_port( |
8eee8ea2 | 190 | bt_self_component_filter *self_comp) |
958f7d11 | 191 | { |
834e9996 | 192 | struct muxer_comp *muxer_comp = bt_self_component_get_data( |
bb61965b | 193 | bt_self_component_filter_as_self_component(self_comp)); |
fb25b9e3 PP |
194 | bt_self_component_add_port_status status = |
195 | BT_SELF_COMPONENT_ADD_PORT_STATUS_OK; | |
958f7d11 | 196 | GString *port_name = NULL; |
958f7d11 | 197 | |
8b45963b | 198 | BT_ASSERT(muxer_comp); |
958f7d11 PP |
199 | port_name = g_string_new("in"); |
200 | if (!port_name) { | |
d9693c64 | 201 | BT_COMP_LOGE_STR("Failed to allocate a GString."); |
fb25b9e3 | 202 | status = BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR; |
958f7d11 PP |
203 | goto end; |
204 | } | |
205 | ||
206 | g_string_append_printf(port_name, "%u", muxer_comp->next_port_num); | |
834e9996 PP |
207 | status = bt_self_component_filter_add_input_port( |
208 | self_comp, port_name->str, NULL, NULL); | |
fb25b9e3 | 209 | if (status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { |
d9693c64 | 210 | BT_COMP_LOGE("Cannot add input port to muxer component: " |
318fe670 | 211 | "port-name=\"%s\", comp-addr=%p, status=%s", |
834e9996 | 212 | port_name->str, self_comp, |
fb25b9e3 | 213 | bt_common_func_status_string(status)); |
958f7d11 PP |
214 | goto end; |
215 | } | |
216 | ||
217 | muxer_comp->available_input_ports++; | |
218 | muxer_comp->next_port_num++; | |
d9693c64 | 219 | BT_COMP_LOGI("Added one input port to muxer component: " |
318fe670 | 220 | "port-name=\"%s\", comp-addr=%p", |
834e9996 | 221 | port_name->str, self_comp); |
1043fdea | 222 | |
958f7d11 PP |
223 | end: |
224 | if (port_name) { | |
225 | g_string_free(port_name, TRUE); | |
226 | } | |
227 | ||
147337a3 | 228 | return status; |
958f7d11 PP |
229 | } |
230 | ||
958f7d11 | 231 | static |
fb25b9e3 | 232 | bt_self_component_add_port_status create_output_port( |
8eee8ea2 | 233 | bt_self_component_filter *self_comp) |
958f7d11 | 234 | { |
834e9996 PP |
235 | return bt_self_component_filter_add_output_port( |
236 | self_comp, "out", NULL, NULL); | |
958f7d11 PP |
237 | } |
238 | ||
239 | static | |
240 | void destroy_muxer_comp(struct muxer_comp *muxer_comp) | |
241 | { | |
242 | if (!muxer_comp) { | |
243 | return; | |
244 | } | |
245 | ||
958f7d11 PP |
246 | g_free(muxer_comp); |
247 | } | |
248 | ||
249 | BT_HIDDEN | |
fb25b9e3 | 250 | bt_component_class_init_method_status muxer_init( |
d9693c64 | 251 | bt_self_component_filter *self_comp_flt, |
3e3ea13e | 252 | const bt_value *params, void *init_data) |
958f7d11 | 253 | { |
fb25b9e3 PP |
254 | bt_component_class_init_method_status status = |
255 | BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK; | |
256 | bt_self_component_add_port_status add_port_status; | |
d9693c64 PP |
257 | bt_self_component *self_comp = |
258 | bt_self_component_filter_as_self_component(self_comp_flt); | |
958f7d11 | 259 | struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1); |
f5abbab4 | 260 | bt_logging_level log_level = bt_component_get_logging_level( |
d9693c64 | 261 | bt_self_component_as_component(self_comp)); |
958f7d11 | 262 | |
d9693c64 | 263 | BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp, |
f5abbab4 | 264 | "Initializing muxer component: " |
834e9996 | 265 | "comp-addr=%p, params-addr=%p", self_comp, params); |
318fe670 | 266 | |
958f7d11 | 267 | if (!muxer_comp) { |
d9693c64 | 268 | BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp, |
f5abbab4 | 269 | "Failed to allocate one muxer component."); |
958f7d11 PP |
270 | goto error; |
271 | } | |
272 | ||
f5abbab4 | 273 | muxer_comp->log_level = log_level; |
d9693c64 PP |
274 | muxer_comp->self_comp = self_comp; |
275 | muxer_comp->self_comp_flt = self_comp_flt; | |
65ee897d | 276 | |
d9693c64 | 277 | bt_self_component_set_data(self_comp, muxer_comp); |
fb25b9e3 PP |
278 | add_port_status = add_available_input_port(self_comp_flt); |
279 | if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { | |
d9693c64 | 280 | BT_COMP_LOGE("Cannot ensure that at least one muxer component's input port is available: " |
318fe670 PP |
281 | "muxer-comp-addr=%p, status=%s", |
282 | muxer_comp, | |
fb25b9e3 PP |
283 | bt_common_func_status_string(add_port_status)); |
284 | if (add_port_status == | |
285 | BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { | |
286 | status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR; | |
287 | } else { | |
288 | status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; | |
289 | } | |
290 | ||
958f7d11 PP |
291 | goto error; |
292 | } | |
293 | ||
fb25b9e3 PP |
294 | add_port_status = create_output_port(self_comp_flt); |
295 | if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { | |
d9693c64 | 296 | BT_COMP_LOGE("Cannot create muxer component's output port: " |
318fe670 PP |
297 | "muxer-comp-addr=%p, status=%s", |
298 | muxer_comp, | |
fb25b9e3 PP |
299 | bt_common_func_status_string(add_port_status)); |
300 | if (add_port_status == | |
301 | BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { | |
302 | status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR; | |
303 | } else { | |
304 | status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; | |
305 | } | |
306 | ||
958f7d11 PP |
307 | goto error; |
308 | } | |
309 | ||
d9693c64 | 310 | BT_COMP_LOGI("Initialized muxer component: " |
318fe670 | 311 | "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p", |
834e9996 | 312 | self_comp, params, muxer_comp); |
318fe670 | 313 | |
958f7d11 PP |
314 | goto end; |
315 | ||
316 | error: | |
317 | destroy_muxer_comp(muxer_comp); | |
d9693c64 | 318 | bt_self_component_set_data(self_comp, NULL); |
147337a3 | 319 | |
fb25b9e3 PP |
320 | if (status == BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) { |
321 | status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; | |
147337a3 | 322 | } |
958f7d11 PP |
323 | |
324 | end: | |
325 | return status; | |
326 | } | |
327 | ||
328 | BT_HIDDEN | |
8eee8ea2 | 329 | void muxer_finalize(bt_self_component_filter *self_comp) |
958f7d11 | 330 | { |
834e9996 | 331 | struct muxer_comp *muxer_comp = bt_self_component_get_data( |
bb61965b | 332 | bt_self_component_filter_as_self_component(self_comp)); |
958f7d11 | 333 | |
d9693c64 | 334 | BT_COMP_LOGI("Finalizing muxer component: comp-addr=%p", |
834e9996 | 335 | self_comp); |
958f7d11 PP |
336 | destroy_muxer_comp(muxer_comp); |
337 | } | |
338 | ||
339 | static | |
ab8b2b1b | 340 | bt_self_component_port_input_message_iterator_create_from_message_iterator_status |
f5abbab4 | 341 | create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, |
692f1a01 | 342 | struct muxer_msg_iter *muxer_msg_iter, |
ab8b2b1b SM |
343 | bt_self_component_port_input *self_port, |
344 | bt_self_component_port_input_message_iterator **msg_iter) | |
958f7d11 | 345 | { |
8eee8ea2 | 346 | const bt_port *port = bt_self_component_port_as_port( |
bb61965b | 347 | bt_self_component_port_input_as_self_component_port( |
834e9996 | 348 | self_port)); |
ab8b2b1b SM |
349 | bt_self_component_port_input_message_iterator_create_from_message_iterator_status |
350 | status; | |
958f7d11 | 351 | |
8b45963b PP |
352 | BT_ASSERT(port); |
353 | BT_ASSERT(bt_port_is_connected(port)); | |
958f7d11 | 354 | |
ab11110e | 355 | // TODO: Advance the iterator to >= the time of the latest |
b09a5592 | 356 | // returned message by the muxer message |
ab11110e | 357 | // iterator which creates it. |
ab8b2b1b SM |
358 | status = bt_self_component_port_input_message_iterator_create_from_message_iterator( |
359 | muxer_msg_iter->self_msg_iter, self_port, msg_iter); | |
360 | if (status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { | |
d9693c64 | 361 | BT_COMP_LOGE("Cannot create upstream message iterator on input port: " |
834e9996 PP |
362 | "port-addr=%p, port-name=\"%s\"", |
363 | port, bt_port_get_name(port)); | |
958f7d11 PP |
364 | goto end; |
365 | } | |
366 | ||
d9693c64 | 367 | BT_COMP_LOGI("Created upstream message iterator on input port: " |
b09a5592 PP |
368 | "port-addr=%p, port-name=\"%s\", msg-iter-addr=%p", |
369 | port, bt_port_get_name(port), msg_iter); | |
318fe670 | 370 | |
958f7d11 | 371 | end: |
ab8b2b1b | 372 | return status; |
958f7d11 PP |
373 | } |
374 | ||
ab11110e | 375 | static |
fb25b9e3 | 376 | bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_next( |
a71ed05c PP |
377 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, |
378 | bool *is_ended) | |
ab11110e | 379 | { |
f5abbab4 PP |
380 | struct muxer_comp *muxer_comp = |
381 | muxer_upstream_msg_iter->muxer_comp; | |
fb25b9e3 PP |
382 | bt_component_class_message_iterator_next_method_status status; |
383 | bt_message_iterator_next_status input_port_iter_status; | |
b09a5592 | 384 | bt_message_array_const msgs; |
3fd7b79d PP |
385 | uint64_t i; |
386 | uint64_t count; | |
ab11110e | 387 | |
d9693c64 | 388 | BT_COMP_LOGD("Calling upstream message iterator's \"next\" method: " |
b09a5592 PP |
389 | "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p", |
390 | muxer_upstream_msg_iter, | |
391 | muxer_upstream_msg_iter->msg_iter); | |
3e3ea13e | 392 | input_port_iter_status = bt_self_component_port_input_message_iterator_next( |
b09a5592 | 393 | muxer_upstream_msg_iter->msg_iter, &msgs, &count); |
d9693c64 | 394 | BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: " |
fb25b9e3 PP |
395 | "status=%s", |
396 | bt_common_func_status_string(input_port_iter_status)); | |
ab11110e | 397 | |
3e3ea13e | 398 | switch (input_port_iter_status) { |
fb25b9e3 | 399 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK: |
089717de | 400 | /* |
b09a5592 | 401 | * Message iterator's current message is |
3fd7b79d | 402 | * valid: it must be considered for muxing operations. |
089717de | 403 | */ |
d9693c64 | 404 | BT_COMP_LOGD_STR("Validated upstream message iterator wrapper."); |
3fd7b79d PP |
405 | BT_ASSERT(count > 0); |
406 | ||
b09a5592 | 407 | /* Move messages to our queue */ |
3fd7b79d PP |
408 | for (i = 0; i < count; i++) { |
409 | /* | |
410 | * Push to tail in order; other side | |
b09a5592 | 411 | * (muxer_msg_iter_do_next_one()) consumes |
3fd7b79d PP |
412 | * from the head first. |
413 | */ | |
b09a5592 PP |
414 | g_queue_push_tail(muxer_upstream_msg_iter->msgs, |
415 | (void *) msgs[i]); | |
3fd7b79d | 416 | } |
fb25b9e3 | 417 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; |
ab11110e | 418 | break; |
fb25b9e3 | 419 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: |
089717de | 420 | /* |
b09a5592 | 421 | * Message iterator's current message is not |
089717de | 422 | * valid anymore. Return |
fb25b9e3 | 423 | * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately. |
089717de | 424 | */ |
fb25b9e3 | 425 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; |
ab11110e | 426 | break; |
fb25b9e3 | 427 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* Fall-through. */ |
ab11110e | 428 | /* |
b09a5592 | 429 | * Message iterator reached the end: release it. It |
ab11110e | 430 | * won't be considered again to find the youngest |
b09a5592 | 431 | * message. |
ab11110e | 432 | */ |
a71ed05c | 433 | *is_ended = true; |
fb25b9e3 | 434 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; |
089717de | 435 | break; |
ab11110e PP |
436 | default: |
437 | /* Error or unsupported status code */ | |
d9693c64 | 438 | BT_COMP_LOGE("Error or unsupported status code: " |
3e3ea13e | 439 | "status-code=%d", input_port_iter_status); |
fb25b9e3 | 440 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
089717de | 441 | break; |
ab11110e PP |
442 | } |
443 | ||
089717de | 444 | return status; |
ab11110e PP |
445 | } |
446 | ||
958f7d11 | 447 | static |
b09a5592 PP |
448 | int get_msg_ts_ns(struct muxer_comp *muxer_comp, |
449 | struct muxer_msg_iter *muxer_msg_iter, | |
450 | const bt_message *msg, int64_t last_returned_ts_ns, | |
958f7d11 PP |
451 | int64_t *ts_ns) |
452 | { | |
ecbb78c0 | 453 | const bt_clock_snapshot *clock_snapshot = NULL; |
958f7d11 | 454 | int ret = 0; |
34e13c22 PP |
455 | const bt_stream_class *stream_class = NULL; |
456 | bt_message_type msg_type; | |
958f7d11 | 457 | |
b09a5592 | 458 | BT_ASSERT(msg); |
8b45963b | 459 | BT_ASSERT(ts_ns); |
d9693c64 | 460 | BT_COMP_LOGD("Getting message's timestamp: " |
b09a5592 | 461 | "muxer-msg-iter-addr=%p, msg-addr=%p, " |
318fe670 | 462 | "last-returned-ts=%" PRId64, |
b09a5592 | 463 | muxer_msg_iter, msg, last_returned_ts_ns); |
318fe670 | 464 | |
85e7137b | 465 | if (G_UNLIKELY(muxer_msg_iter->clock_class_expectation == |
37911c11 PP |
466 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) { |
467 | *ts_ns = last_returned_ts_ns; | |
468 | goto end; | |
469 | } | |
470 | ||
34e13c22 PP |
471 | msg_type = bt_message_get_type(msg); |
472 | ||
85e7137b | 473 | if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) { |
34e13c22 PP |
474 | stream_class = bt_stream_borrow_class_const( |
475 | bt_packet_borrow_stream_const( | |
476 | bt_message_packet_beginning_borrow_packet_const( | |
477 | msg))); | |
85e7137b | 478 | } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_END)) { |
34e13c22 PP |
479 | stream_class = bt_stream_borrow_class_const( |
480 | bt_packet_borrow_stream_const( | |
481 | bt_message_packet_end_borrow_packet_const( | |
482 | msg))); | |
85e7137b | 483 | } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) { |
77037b2b PP |
484 | stream_class = bt_stream_borrow_class_const( |
485 | bt_message_discarded_events_borrow_stream_const(msg)); | |
85e7137b | 486 | } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) { |
77037b2b PP |
487 | stream_class = bt_stream_borrow_class_const( |
488 | bt_message_discarded_packets_borrow_stream_const(msg)); | |
34e13c22 PP |
489 | } |
490 | ||
491 | switch (msg_type) { | |
b09a5592 | 492 | case BT_MESSAGE_TYPE_EVENT: |
37911c11 PP |
493 | BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const( |
494 | msg)); | |
11ddb3ef PP |
495 | clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const( |
496 | msg); | |
958f7d11 | 497 | break; |
4a4dd64f | 498 | case BT_MESSAGE_TYPE_PACKET_BEGINNING: |
5ef34326 | 499 | if (bt_stream_class_packets_have_beginning_default_clock_snapshot( |
34e13c22 PP |
500 | stream_class)) { |
501 | clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const( | |
502 | msg); | |
503 | } else { | |
504 | goto no_clock_snapshot; | |
505 | } | |
506 | ||
4a4dd64f PP |
507 | break; |
508 | case BT_MESSAGE_TYPE_PACKET_END: | |
5ef34326 | 509 | if (bt_stream_class_packets_have_end_default_clock_snapshot( |
34e13c22 PP |
510 | stream_class)) { |
511 | clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const( | |
512 | msg); | |
513 | } else { | |
514 | goto no_clock_snapshot; | |
515 | } | |
516 | ||
4a4dd64f | 517 | break; |
3ce73327 FD |
518 | case BT_MESSAGE_TYPE_STREAM_BEGINNING: |
519 | { | |
520 | enum bt_message_stream_clock_snapshot_state snapshot_state = | |
521 | bt_message_stream_beginning_borrow_default_clock_snapshot_const( | |
522 | msg, &clock_snapshot); | |
523 | if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) { | |
524 | goto no_clock_snapshot; | |
525 | } | |
526 | ||
527 | break; | |
528 | } | |
529 | case BT_MESSAGE_TYPE_STREAM_END: | |
530 | { | |
531 | enum bt_message_stream_clock_snapshot_state snapshot_state = | |
532 | bt_message_stream_end_borrow_default_clock_snapshot_const( | |
533 | msg, &clock_snapshot); | |
534 | if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) { | |
535 | goto no_clock_snapshot; | |
536 | } | |
537 | ||
538 | break; | |
539 | } | |
4a4dd64f | 540 | case BT_MESSAGE_TYPE_DISCARDED_EVENTS: |
77037b2b PP |
541 | if (bt_stream_class_discarded_events_have_default_clock_snapshots( |
542 | stream_class)) { | |
5ef34326 | 543 | clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const( |
77037b2b PP |
544 | msg); |
545 | } else { | |
546 | goto no_clock_snapshot; | |
547 | } | |
548 | ||
4a4dd64f PP |
549 | break; |
550 | case BT_MESSAGE_TYPE_DISCARDED_PACKETS: | |
77037b2b PP |
551 | if (bt_stream_class_discarded_packets_have_default_clock_snapshots( |
552 | stream_class)) { | |
5ef34326 | 553 | clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const( |
77037b2b PP |
554 | msg); |
555 | } else { | |
556 | goto no_clock_snapshot; | |
557 | } | |
558 | ||
4a4dd64f | 559 | break; |
40bf6fd0 | 560 | case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: |
11ddb3ef PP |
561 | clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( |
562 | msg); | |
958f7d11 PP |
563 | break; |
564 | default: | |
b09a5592 | 565 | /* All the other messages have a higher priority */ |
d9693c64 | 566 | BT_COMP_LOGD_STR("Message has no timestamp: using the last returned timestamp."); |
958f7d11 PP |
567 | *ts_ns = last_returned_ts_ns; |
568 | goto end; | |
569 | } | |
570 | ||
37911c11 PP |
571 | ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); |
572 | if (ret) { | |
d9693c64 | 573 | BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " |
37911c11 PP |
574 | "clock-snapshot-addr=%p", clock_snapshot); |
575 | goto error; | |
576 | } | |
577 | ||
578 | goto end; | |
579 | ||
580 | no_clock_snapshot: | |
d9693c64 | 581 | BT_COMP_LOGD_STR("Message's default clock snapshot is missing: " |
37911c11 PP |
582 | "using the last returned timestamp."); |
583 | *ts_ns = last_returned_ts_ns; | |
584 | goto end; | |
585 | ||
586 | error: | |
587 | ret = -1; | |
588 | ||
589 | end: | |
590 | if (ret == 0) { | |
d9693c64 | 591 | BT_COMP_LOGD("Found message's timestamp: " |
37911c11 PP |
592 | "muxer-msg-iter-addr=%p, msg-addr=%p, " |
593 | "last-returned-ts=%" PRId64 ", ts=%" PRId64, | |
594 | muxer_msg_iter, msg, last_returned_ts_ns, | |
595 | *ts_ns); | |
7b33a0e0 PP |
596 | } |
597 | ||
37911c11 PP |
598 | return ret; |
599 | } | |
600 | ||
601 | static inline | |
602 | int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, | |
603 | struct muxer_comp *muxer_comp, | |
604 | const bt_clock_class *clock_class) | |
605 | { | |
606 | int ret = 0; | |
d126826c | 607 | const uint8_t *cc_uuid; |
37911c11 PP |
608 | const char *cc_name; |
609 | ||
8fc063a2 | 610 | BT_ASSERT(clock_class); |
839d52a5 PP |
611 | cc_uuid = bt_clock_class_get_uuid(clock_class); |
612 | cc_name = bt_clock_class_get_name(clock_class); | |
282c8cd0 | 613 | |
b09a5592 PP |
614 | if (muxer_msg_iter->clock_class_expectation == |
615 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) { | |
282c8cd0 | 616 | /* |
9635b8a6 FD |
617 | * This is the first clock class that this muxer message |
618 | * iterator encounters. Its properties determine what to expect | |
619 | * for the whole lifetime of the iterator. | |
282c8cd0 | 620 | */ |
d608d675 | 621 | if (bt_clock_class_origin_is_unix_epoch(clock_class)) { |
282c8cd0 | 622 | /* Expect absolute clock classes */ |
b09a5592 PP |
623 | muxer_msg_iter->clock_class_expectation = |
624 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE; | |
282c8cd0 PP |
625 | } else { |
626 | if (cc_uuid) { | |
627 | /* | |
628 | * Expect non-absolute clock classes | |
629 | * with a specific UUID. | |
630 | */ | |
b09a5592 PP |
631 | muxer_msg_iter->clock_class_expectation = |
632 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID; | |
d126826c | 633 | bt_uuid_copy(muxer_msg_iter->expected_clock_class_uuid, cc_uuid); |
282c8cd0 PP |
634 | } else { |
635 | /* | |
636 | * Expect non-absolute clock classes | |
637 | * with no UUID. | |
638 | */ | |
b09a5592 PP |
639 | muxer_msg_iter->clock_class_expectation = |
640 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID; | |
282c8cd0 PP |
641 | } |
642 | } | |
643 | } | |
644 | ||
9635b8a6 FD |
645 | switch (muxer_msg_iter->clock_class_expectation) { |
646 | case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE: | |
647 | if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { | |
648 | BT_COMP_LOGE("Expecting an absolute clock class, " | |
649 | "but got a non-absolute one: " | |
650 | "clock-class-addr=%p, clock-class-name=\"%s\"", | |
651 | clock_class, cc_name); | |
652 | goto error; | |
653 | } | |
654 | break; | |
655 | case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID: | |
656 | if (bt_clock_class_origin_is_unix_epoch(clock_class)) { | |
657 | BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, " | |
658 | "but got an absolute one: " | |
659 | "clock-class-addr=%p, clock-class-name=\"%s\"", | |
660 | clock_class, cc_name); | |
661 | goto error; | |
662 | } | |
282c8cd0 | 663 | |
9635b8a6 FD |
664 | if (cc_uuid) { |
665 | BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, " | |
666 | "but got one with a UUID: " | |
667 | "clock-class-addr=%p, clock-class-name=\"%s\", " | |
668 | "uuid=\"" BT_UUID_FMT "\"", | |
669 | clock_class, cc_name, BT_UUID_FMT_VALUES(cc_uuid)); | |
670 | goto error; | |
671 | } | |
672 | break; | |
673 | case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID: | |
674 | if (bt_clock_class_origin_is_unix_epoch(clock_class)) { | |
675 | BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " | |
676 | "but got an absolute one: " | |
677 | "clock-class-addr=%p, clock-class-name=\"%s\"", | |
678 | clock_class, cc_name); | |
679 | goto error; | |
680 | } | |
282c8cd0 | 681 | |
9635b8a6 FD |
682 | if (!cc_uuid) { |
683 | BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " | |
684 | "but got one with no UUID: " | |
37911c11 PP |
685 | "clock-class-addr=%p, clock-class-name=\"%s\"", |
686 | clock_class, cc_name); | |
687 | goto error; | |
282c8cd0 | 688 | } |
9635b8a6 FD |
689 | |
690 | if (bt_uuid_compare(muxer_msg_iter->expected_clock_class_uuid, cc_uuid) != 0) { | |
691 | BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " | |
692 | "but got one with different UUID: " | |
693 | "clock-class-addr=%p, clock-class-name=\"%s\", " | |
694 | "expected-uuid=\"" BT_UUID_FMT "\", " | |
695 | "uuid=\"" BT_UUID_FMT "\"", | |
696 | clock_class, cc_name, | |
697 | BT_UUID_FMT_VALUES(muxer_msg_iter->expected_clock_class_uuid), | |
698 | BT_UUID_FMT_VALUES(cc_uuid)); | |
699 | goto error; | |
700 | } | |
701 | break; | |
702 | case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE: | |
703 | BT_COMP_LOGE("Expecting no clock class, but got one: " | |
704 | "clock-class-addr=%p, clock-class-name=\"%s\"", | |
705 | clock_class, cc_name); | |
706 | goto error; | |
707 | default: | |
708 | /* Unexpected */ | |
709 | BT_COMP_LOGF("Unexpected clock class expectation: " | |
710 | "expectation-code=%d", | |
711 | muxer_msg_iter->clock_class_expectation); | |
712 | abort(); | |
958f7d11 PP |
713 | } |
714 | ||
2d42927b PP |
715 | goto end; |
716 | ||
958f7d11 PP |
717 | error: |
718 | ret = -1; | |
719 | ||
720 | end: | |
37911c11 PP |
721 | return ret; |
722 | } | |
723 | ||
724 | static inline | |
725 | int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter, | |
726 | struct muxer_comp *muxer_comp, const bt_stream *stream) | |
727 | { | |
728 | int ret = 0; | |
729 | const bt_stream_class *stream_class = | |
730 | bt_stream_borrow_class_const(stream); | |
731 | const bt_clock_class *clock_class = | |
732 | bt_stream_class_borrow_default_clock_class_const(stream_class); | |
733 | ||
734 | if (!clock_class) { | |
735 | if (muxer_msg_iter->clock_class_expectation == | |
736 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) { | |
737 | /* Expect no clock class */ | |
738 | muxer_msg_iter->clock_class_expectation = | |
739 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE; | |
150a034f SM |
740 | } else if (muxer_msg_iter->clock_class_expectation != |
741 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE) { | |
742 | BT_COMP_LOGE("Expecting stream class without a default clock class: " | |
37911c11 PP |
743 | "stream-class-addr=%p, stream-class-name=\"%s\", " |
744 | "stream-class-id=%" PRIu64, | |
745 | stream_class, bt_stream_class_get_name(stream_class), | |
746 | bt_stream_class_get_id(stream_class)); | |
747 | ret = -1; | |
748 | } | |
749 | ||
750 | goto end; | |
318fe670 PP |
751 | } |
752 | ||
37911c11 PP |
753 | ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class); |
754 | ||
755 | end: | |
958f7d11 PP |
756 | return ret; |
757 | } | |
758 | ||
ab11110e | 759 | /* |
b09a5592 PP |
760 | * This function finds the youngest available message amongst the |
761 | * non-ended upstream message iterators and returns the upstream | |
762 | * message iterator which has it, or | |
763 | * BT_MESSAGE_ITERATOR_STATUS_END if there's no available | |
764 | * message. | |
ab11110e PP |
765 | * |
766 | * This function does NOT: | |
767 | * | |
b09a5592 | 768 | * * Update any upstream message iterator. |
b09a5592 | 769 | * * Check the upstream message iterators to retry. |
ab11110e | 770 | * |
b09a5592 PP |
771 | * On sucess, this function sets *muxer_upstream_msg_iter to the |
772 | * upstream message iterator of which the current message is | |
ab11110e PP |
773 | * the youngest, and sets *ts_ns to its time. |
774 | */ | |
958f7d11 | 775 | static |
fb25b9e3 | 776 | bt_component_class_message_iterator_next_method_status |
b09a5592 | 777 | muxer_msg_iter_youngest_upstream_msg_iter( |
958f7d11 | 778 | struct muxer_comp *muxer_comp, |
b09a5592 PP |
779 | struct muxer_msg_iter *muxer_msg_iter, |
780 | struct muxer_upstream_msg_iter **muxer_upstream_msg_iter, | |
958f7d11 PP |
781 | int64_t *ts_ns) |
782 | { | |
783 | size_t i; | |
784 | int ret; | |
785 | int64_t youngest_ts_ns = INT64_MAX; | |
fb25b9e3 PP |
786 | bt_component_class_message_iterator_next_method_status status = |
787 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; | |
958f7d11 | 788 | |
8b45963b | 789 | BT_ASSERT(muxer_comp); |
b09a5592 PP |
790 | BT_ASSERT(muxer_msg_iter); |
791 | BT_ASSERT(muxer_upstream_msg_iter); | |
792 | *muxer_upstream_msg_iter = NULL; | |
793 | ||
a71ed05c PP |
794 | for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; |
795 | i++) { | |
b09a5592 PP |
796 | const bt_message *msg; |
797 | struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter = | |
a71ed05c PP |
798 | g_ptr_array_index( |
799 | muxer_msg_iter->active_muxer_upstream_msg_iters, | |
800 | i); | |
b09a5592 PP |
801 | int64_t msg_ts_ns; |
802 | ||
803 | if (!cur_muxer_upstream_msg_iter->msg_iter) { | |
804 | /* This upstream message iterator is ended */ | |
c9ecaa78 | 805 | BT_COMP_LOGT("Skipping ended upstream message iterator: " |
b09a5592 PP |
806 | "muxer-upstream-msg-iter-wrap-addr=%p", |
807 | cur_muxer_upstream_msg_iter); | |
958f7d11 PP |
808 | continue; |
809 | } | |
810 | ||
b09a5592 PP |
811 | BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0); |
812 | msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); | |
813 | BT_ASSERT(msg); | |
37911c11 | 814 | |
85e7137b | 815 | if (G_UNLIKELY(bt_message_get_type(msg) == |
37911c11 PP |
816 | BT_MESSAGE_TYPE_STREAM_BEGINNING)) { |
817 | ret = validate_new_stream_clock_class( | |
818 | muxer_msg_iter, muxer_comp, | |
819 | bt_message_stream_beginning_borrow_stream_const( | |
820 | msg)); | |
821 | if (ret) { | |
822 | /* | |
823 | * validate_new_stream_clock_class() logs | |
824 | * errors. | |
825 | */ | |
fb25b9e3 | 826 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
37911c11 PP |
827 | goto end; |
828 | } | |
85e7137b | 829 | } else if (G_UNLIKELY(bt_message_get_type(msg) == |
37911c11 PP |
830 | BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) { |
831 | const bt_clock_snapshot *cs; | |
37911c11 | 832 | |
11ddb3ef PP |
833 | cs = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( |
834 | msg); | |
37911c11 PP |
835 | ret = validate_clock_class(muxer_msg_iter, muxer_comp, |
836 | bt_clock_snapshot_borrow_clock_class_const(cs)); | |
837 | if (ret) { | |
838 | /* validate_clock_class() logs errors */ | |
fb25b9e3 | 839 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
37911c11 PP |
840 | goto end; |
841 | } | |
842 | } | |
843 | ||
b09a5592 PP |
844 | ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg, |
845 | muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns); | |
958f7d11 | 846 | if (ret) { |
b09a5592 PP |
847 | /* get_msg_ts_ns() logs errors */ |
848 | *muxer_upstream_msg_iter = NULL; | |
fb25b9e3 | 849 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
958f7d11 PP |
850 | goto end; |
851 | } | |
852 | ||
759bd1bf FD |
853 | /* |
854 | * Update the current message iterator if it has not been set | |
855 | * yet, or if its current message has a timestamp smaller than | |
856 | * the previously selected youngest message. | |
857 | */ | |
858 | if (G_UNLIKELY(*muxer_upstream_msg_iter == NULL) || | |
859 | msg_ts_ns < youngest_ts_ns) { | |
b09a5592 PP |
860 | *muxer_upstream_msg_iter = |
861 | cur_muxer_upstream_msg_iter; | |
862 | youngest_ts_ns = msg_ts_ns; | |
958f7d11 | 863 | *ts_ns = youngest_ts_ns; |
c3923e1f FD |
864 | } else if (msg_ts_ns == youngest_ts_ns) { |
865 | /* | |
866 | * The currently selected message to be sent downstream | |
867 | * next has the exact same timestamp that of the | |
868 | * current candidate message. We must break the tie | |
869 | * in a predictable manner. | |
870 | */ | |
871 | const bt_message *selected_msg = g_queue_peek_head( | |
872 | (*muxer_upstream_msg_iter)->msgs); | |
873 | BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); | |
874 | ||
875 | /* | |
876 | * Order the messages in an arbitrary but determinitic | |
877 | * way. | |
878 | */ | |
9240924b | 879 | ret = common_muxing_compare_messages(msg, selected_msg); |
c3923e1f FD |
880 | if (ret < 0) { |
881 | /* | |
882 | * The `msg` should go first. Update the next | |
883 | * iterator and the current timestamp. | |
884 | */ | |
885 | *muxer_upstream_msg_iter = | |
886 | cur_muxer_upstream_msg_iter; | |
887 | youngest_ts_ns = msg_ts_ns; | |
888 | *ts_ns = youngest_ts_ns; | |
889 | } else if (ret == 0) { | |
890 | /* Unable to pick which one should go first. */ | |
891 | BT_COMP_LOGW("Cannot deterministically pick next upstream message iterator because they have identical next messages: " | |
892 | "muxer-upstream-msg-iter-wrap-addr=%p" | |
893 | "cur-muxer-upstream-msg-iter-wrap-addr=%p", | |
894 | *muxer_upstream_msg_iter, | |
895 | cur_muxer_upstream_msg_iter); | |
896 | } | |
958f7d11 PP |
897 | } |
898 | } | |
899 | ||
b09a5592 | 900 | if (!*muxer_upstream_msg_iter) { |
fb25b9e3 | 901 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; |
958f7d11 PP |
902 | *ts_ns = INT64_MIN; |
903 | } | |
904 | ||
905 | end: | |
906 | return status; | |
907 | } | |
908 | ||
909 | static | |
fb25b9e3 PP |
910 | bt_component_class_message_iterator_next_method_status |
911 | validate_muxer_upstream_msg_iter( | |
a71ed05c PP |
912 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, |
913 | bool *is_ended) | |
958f7d11 | 914 | { |
f5abbab4 PP |
915 | struct muxer_comp *muxer_comp = |
916 | muxer_upstream_msg_iter->muxer_comp; | |
fb25b9e3 PP |
917 | bt_component_class_message_iterator_next_method_status status = |
918 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; | |
958f7d11 | 919 | |
d9693c64 | 920 | BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: " |
b09a5592 PP |
921 | "muxer-upstream-msg-iter-wrap-addr=%p", |
922 | muxer_upstream_msg_iter); | |
318fe670 | 923 | |
b09a5592 PP |
924 | if (muxer_upstream_msg_iter->msgs->length > 0 || |
925 | !muxer_upstream_msg_iter->msg_iter) { | |
d9693c64 | 926 | BT_COMP_LOGD("Already valid or not considered: " |
b09a5592 PP |
927 | "queue-len=%u, upstream-msg-iter-addr=%p", |
928 | muxer_upstream_msg_iter->msgs->length, | |
929 | muxer_upstream_msg_iter->msg_iter); | |
ab11110e PP |
930 | goto end; |
931 | } | |
932 | ||
b09a5592 | 933 | /* muxer_upstream_msg_iter_next() logs details/errors */ |
a71ed05c PP |
934 | status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter, |
935 | is_ended); | |
089717de PP |
936 | |
937 | end: | |
938 | return status; | |
939 | } | |
940 | ||
941 | static | |
fb25b9e3 PP |
942 | bt_component_class_message_iterator_next_method_status |
943 | validate_muxer_upstream_msg_iters( | |
b09a5592 | 944 | struct muxer_msg_iter *muxer_msg_iter) |
089717de | 945 | { |
f5abbab4 | 946 | struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; |
fb25b9e3 PP |
947 | bt_component_class_message_iterator_next_method_status status = |
948 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; | |
089717de PP |
949 | size_t i; |
950 | ||
d9693c64 | 951 | BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: " |
b09a5592 | 952 | "muxer-msg-iter-addr=%p", muxer_msg_iter); |
318fe670 | 953 | |
a71ed05c PP |
954 | for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; |
955 | i++) { | |
90aed4f2 | 956 | bool is_ended = false; |
b09a5592 | 957 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = |
089717de | 958 | g_ptr_array_index( |
a71ed05c | 959 | muxer_msg_iter->active_muxer_upstream_msg_iters, |
089717de PP |
960 | i); |
961 | ||
b09a5592 | 962 | status = validate_muxer_upstream_msg_iter( |
a71ed05c | 963 | muxer_upstream_msg_iter, &is_ended); |
fb25b9e3 | 964 | if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { |
318fe670 | 965 | if (status < 0) { |
d9693c64 | 966 | BT_COMP_LOGE("Cannot validate muxer's upstream message iterator wrapper: " |
b09a5592 PP |
967 | "muxer-msg-iter-addr=%p, " |
968 | "muxer-upstream-msg-iter-wrap-addr=%p", | |
969 | muxer_msg_iter, | |
970 | muxer_upstream_msg_iter); | |
318fe670 | 971 | } else { |
d9693c64 | 972 | BT_COMP_LOGD("Cannot validate muxer's upstream message iterator wrapper: " |
b09a5592 PP |
973 | "muxer-msg-iter-addr=%p, " |
974 | "muxer-upstream-msg-iter-wrap-addr=%p", | |
975 | muxer_msg_iter, | |
976 | muxer_upstream_msg_iter); | |
318fe670 PP |
977 | } |
978 | ||
089717de PP |
979 | goto end; |
980 | } | |
744ba28b PP |
981 | |
982 | /* | |
a71ed05c PP |
983 | * Move this muxer upstream message iterator to the |
984 | * array of ended iterators if it's ended. | |
744ba28b | 985 | */ |
85e7137b | 986 | if (G_UNLIKELY(is_ended)) { |
d9693c64 | 987 | BT_COMP_LOGD("Muxer's upstream message iterator wrapper: ended or canceled: " |
a71ed05c PP |
988 | "muxer-msg-iter-addr=%p, " |
989 | "muxer-upstream-msg-iter-wrap-addr=%p", | |
990 | muxer_msg_iter, muxer_upstream_msg_iter); | |
991 | g_ptr_array_add( | |
992 | muxer_msg_iter->ended_muxer_upstream_msg_iters, | |
993 | muxer_upstream_msg_iter); | |
994 | muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL; | |
995 | ||
744ba28b PP |
996 | /* |
997 | * Use g_ptr_array_remove_fast() because the | |
998 | * order of those elements is not important. | |
999 | */ | |
1000 | g_ptr_array_remove_index_fast( | |
a71ed05c | 1001 | muxer_msg_iter->active_muxer_upstream_msg_iters, |
744ba28b PP |
1002 | i); |
1003 | i--; | |
1004 | } | |
089717de PP |
1005 | } |
1006 | ||
1007 | end: | |
1008 | return status; | |
1009 | } | |
1010 | ||
3fd7b79d | 1011 | static inline |
fb25b9e3 | 1012 | bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_one( |
089717de | 1013 | struct muxer_comp *muxer_comp, |
b09a5592 PP |
1014 | struct muxer_msg_iter *muxer_msg_iter, |
1015 | const bt_message **msg) | |
089717de | 1016 | { |
fb25b9e3 | 1017 | bt_component_class_message_iterator_next_method_status status; |
b09a5592 | 1018 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL; |
089717de PP |
1019 | int64_t next_return_ts; |
1020 | ||
1043fdea | 1021 | status = validate_muxer_upstream_msg_iters(muxer_msg_iter); |
fb25b9e3 | 1022 | if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { |
1043fdea PP |
1023 | /* validate_muxer_upstream_msg_iters() logs details */ |
1024 | goto end; | |
958f7d11 PP |
1025 | } |
1026 | ||
1027 | /* | |
089717de | 1028 | * At this point we know that all the existing upstream |
b09a5592 PP |
1029 | * message iterators are valid. We can find the one, |
1030 | * amongst those, of which the current message is the | |
089717de | 1031 | * youngest. |
958f7d11 | 1032 | */ |
b09a5592 PP |
1033 | status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, |
1034 | muxer_msg_iter, &muxer_upstream_msg_iter, | |
089717de | 1035 | &next_return_ts); |
fb25b9e3 | 1036 | if (status < 0 || status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) { |
3fd7b79d | 1037 | if (status < 0) { |
d9693c64 | 1038 | BT_COMP_LOGE("Cannot find the youngest upstream message iterator wrapper: " |
318fe670 | 1039 | "status=%s", |
fb25b9e3 | 1040 | bt_common_func_status_string(status)); |
318fe670 | 1041 | } else { |
d9693c64 | 1042 | BT_COMP_LOGD("Cannot find the youngest upstream message iterator wrapper: " |
318fe670 | 1043 | "status=%s", |
fb25b9e3 | 1044 | bt_common_func_status_string(status)); |
318fe670 PP |
1045 | } |
1046 | ||
958f7d11 PP |
1047 | goto end; |
1048 | } | |
1049 | ||
b09a5592 | 1050 | if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) { |
d9693c64 | 1051 | BT_COMP_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " |
b09a5592 | 1052 | "muxer-msg-iter-addr=%p, ts=%" PRId64 ", " |
318fe670 | 1053 | "last-returned-ts=%" PRId64, |
b09a5592 PP |
1054 | muxer_msg_iter, next_return_ts, |
1055 | muxer_msg_iter->last_returned_ts_ns); | |
fb25b9e3 | 1056 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
958f7d11 PP |
1057 | goto end; |
1058 | } | |
1059 | ||
d9693c64 | 1060 | BT_COMP_LOGD("Found youngest upstream message iterator wrapper: " |
b09a5592 PP |
1061 | "muxer-msg-iter-addr=%p, " |
1062 | "muxer-upstream-msg-iter-wrap-addr=%p, " | |
318fe670 | 1063 | "ts=%" PRId64, |
b09a5592 | 1064 | muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); |
fb25b9e3 | 1065 | BT_ASSERT(status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); |
b09a5592 | 1066 | BT_ASSERT(muxer_upstream_msg_iter); |
958f7d11 PP |
1067 | |
1068 | /* | |
3fd7b79d | 1069 | * Consume from the queue's head: other side |
b09a5592 | 1070 | * (muxer_upstream_msg_iter_next()) writes to the tail. |
958f7d11 | 1071 | */ |
b09a5592 PP |
1072 | *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs); |
1073 | BT_ASSERT(*msg); | |
1074 | muxer_msg_iter->last_returned_ts_ns = next_return_ts; | |
958f7d11 PP |
1075 | |
1076 | end: | |
3fd7b79d PP |
1077 | return status; |
1078 | } | |
1079 | ||
1080 | static | |
fb25b9e3 | 1081 | bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( |
3fd7b79d | 1082 | struct muxer_comp *muxer_comp, |
b09a5592 PP |
1083 | struct muxer_msg_iter *muxer_msg_iter, |
1084 | bt_message_array_const msgs, uint64_t capacity, | |
3fd7b79d PP |
1085 | uint64_t *count) |
1086 | { | |
fb25b9e3 PP |
1087 | bt_component_class_message_iterator_next_method_status status = |
1088 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; | |
3fd7b79d PP |
1089 | uint64_t i = 0; |
1090 | ||
fb25b9e3 | 1091 | while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { |
b09a5592 PP |
1092 | status = muxer_msg_iter_do_next_one(muxer_comp, |
1093 | muxer_msg_iter, &msgs[i]); | |
fb25b9e3 | 1094 | if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { |
3fd7b79d PP |
1095 | i++; |
1096 | } | |
1097 | } | |
1098 | ||
1099 | if (i > 0) { | |
1100 | /* | |
b09a5592 | 1101 | * Even if muxer_msg_iter_do_next_one() returned |
3fd7b79d | 1102 | * something else than |
b09a5592 PP |
1103 | * BT_MESSAGE_ITERATOR_STATUS_OK, we accumulated |
1104 | * message objects in the output message | |
3fd7b79d | 1105 | * array, so we need to return |
b09a5592 | 1106 | * BT_MESSAGE_ITERATOR_STATUS_OK so that they are |
3fd7b79d | 1107 | * transfered to downstream. This other status occurs |
b09a5592 | 1108 | * again the next time muxer_msg_iter_do_next() is |
3fd7b79d | 1109 | * called, possibly without any accumulated |
b09a5592 | 1110 | * message, in which case we'll return it. |
3fd7b79d PP |
1111 | */ |
1112 | *count = i; | |
fb25b9e3 | 1113 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; |
3fd7b79d PP |
1114 | } |
1115 | ||
1116 | return status; | |
958f7d11 PP |
1117 | } |
1118 | ||
1119 | static | |
b09a5592 | 1120 | void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) |
ab11110e | 1121 | { |
f5abbab4 PP |
1122 | struct muxer_comp *muxer_comp; |
1123 | ||
b09a5592 | 1124 | if (!muxer_msg_iter) { |
ab11110e PP |
1125 | return; |
1126 | } | |
1127 | ||
f5abbab4 | 1128 | muxer_comp = muxer_msg_iter->muxer_comp; |
d9693c64 | 1129 | BT_COMP_LOGD("Destroying muxer component's message iterator: " |
b09a5592 | 1130 | "muxer-msg-iter-addr=%p", muxer_msg_iter); |
318fe670 | 1131 | |
a71ed05c | 1132 | if (muxer_msg_iter->active_muxer_upstream_msg_iters) { |
d9693c64 | 1133 | BT_COMP_LOGD_STR("Destroying muxer's active upstream message iterator wrappers."); |
a71ed05c PP |
1134 | g_ptr_array_free( |
1135 | muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE); | |
1136 | } | |
1137 | ||
1138 | if (muxer_msg_iter->ended_muxer_upstream_msg_iters) { | |
d9693c64 | 1139 | BT_COMP_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers."); |
ab11110e | 1140 | g_ptr_array_free( |
a71ed05c | 1141 | muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE); |
ab11110e PP |
1142 | } |
1143 | ||
b09a5592 | 1144 | g_free(muxer_msg_iter); |
ab11110e PP |
1145 | } |
1146 | ||
1147 | static | |
ab8b2b1b SM |
1148 | bt_component_class_message_iterator_init_method_status |
1149 | muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, | |
b09a5592 | 1150 | struct muxer_msg_iter *muxer_msg_iter) |
958f7d11 | 1151 | { |
544d0515 PP |
1152 | int64_t count; |
1153 | int64_t i; | |
ab8b2b1b | 1154 | bt_component_class_message_iterator_init_method_status status; |
958f7d11 | 1155 | |
834e9996 | 1156 | count = bt_component_filter_get_input_port_count( |
bb61965b | 1157 | bt_self_component_filter_as_component_filter( |
d9693c64 | 1158 | muxer_comp->self_comp_flt)); |
544d0515 | 1159 | if (count < 0) { |
d9693c64 | 1160 | BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: " |
b09a5592 PP |
1161 | "muxer-comp-addr=%p, muxer-msg-iter-addr=%p", |
1162 | muxer_comp, muxer_msg_iter); | |
ab8b2b1b | 1163 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; |
ab11110e PP |
1164 | goto end; |
1165 | } | |
958f7d11 PP |
1166 | |
1167 | for (i = 0; i < count; i++) { | |
1043fdea | 1168 | bt_self_component_port_input_message_iterator *upstream_msg_iter; |
8eee8ea2 | 1169 | bt_self_component_port_input *self_port = |
834e9996 | 1170 | bt_self_component_filter_borrow_input_port_by_index( |
d9693c64 | 1171 | muxer_comp->self_comp_flt, i); |
8eee8ea2 | 1172 | const bt_port *port; |
ab8b2b1b SM |
1173 | bt_self_component_port_input_message_iterator_create_from_message_iterator_status |
1174 | msg_iter_status; | |
1175 | int int_status; | |
958f7d11 | 1176 | |
834e9996 | 1177 | BT_ASSERT(self_port); |
bb61965b PP |
1178 | port = bt_self_component_port_as_port( |
1179 | bt_self_component_port_input_as_self_component_port( | |
834e9996 | 1180 | self_port)); |
8b45963b | 1181 | BT_ASSERT(port); |
958f7d11 PP |
1182 | |
1183 | if (!bt_port_is_connected(port)) { | |
1043fdea | 1184 | /* Skip non-connected port */ |
958f7d11 PP |
1185 | continue; |
1186 | } | |
1187 | ||
ab8b2b1b SM |
1188 | msg_iter_status = create_msg_iter_on_input_port(muxer_comp, |
1189 | muxer_msg_iter, self_port, &upstream_msg_iter); | |
1190 | if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { | |
1043fdea | 1191 | /* create_msg_iter_on_input_port() logs errors */ |
ab8b2b1b | 1192 | status = (int) msg_iter_status; |
ab11110e | 1193 | goto end; |
958f7d11 | 1194 | } |
318fe670 | 1195 | |
ab8b2b1b | 1196 | int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, |
ae644e59 | 1197 | upstream_msg_iter); |
1043fdea PP |
1198 | bt_self_component_port_input_message_iterator_put_ref( |
1199 | upstream_msg_iter); | |
ab8b2b1b SM |
1200 | if (int_status) { |
1201 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; | |
1043fdea | 1202 | /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ |
1043fdea PP |
1203 | goto end; |
1204 | } | |
958f7d11 PP |
1205 | } |
1206 | ||
ab8b2b1b SM |
1207 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; |
1208 | ||
958f7d11 | 1209 | end: |
ab8b2b1b | 1210 | return status; |
958f7d11 PP |
1211 | } |
1212 | ||
958f7d11 | 1213 | BT_HIDDEN |
fb25b9e3 | 1214 | bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( |
b09a5592 | 1215 | bt_self_message_iterator *self_msg_iter, |
8eee8ea2 PP |
1216 | bt_self_component_filter *self_comp, |
1217 | bt_self_component_port_output *port) | |
958f7d11 PP |
1218 | { |
1219 | struct muxer_comp *muxer_comp = NULL; | |
b09a5592 | 1220 | struct muxer_msg_iter *muxer_msg_iter = NULL; |
ab8b2b1b | 1221 | bt_component_class_message_iterator_init_method_status status; |
958f7d11 | 1222 | |
834e9996 | 1223 | muxer_comp = bt_self_component_get_data( |
bb61965b | 1224 | bt_self_component_filter_as_self_component(self_comp)); |
8b45963b | 1225 | BT_ASSERT(muxer_comp); |
d9693c64 | 1226 | BT_COMP_LOGD("Initializing muxer component's message iterator: " |
b09a5592 PP |
1227 | "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", |
1228 | self_comp, muxer_comp, self_msg_iter); | |
a09c6b95 | 1229 | |
b09a5592 | 1230 | if (muxer_comp->initializing_muxer_msg_iter) { |
a09c6b95 | 1231 | /* |
089717de | 1232 | * Weird, unhandled situation detected: downstream |
b09a5592 PP |
1233 | * creates a muxer message iterator while creating |
1234 | * another muxer message iterator (same component). | |
a09c6b95 | 1235 | */ |
d9693c64 | 1236 | BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: " |
b09a5592 PP |
1237 | "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", |
1238 | self_comp, muxer_comp, self_msg_iter); | |
ab8b2b1b | 1239 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; |
958f7d11 PP |
1240 | goto error; |
1241 | } | |
1242 | ||
b09a5592 PP |
1243 | muxer_comp->initializing_muxer_msg_iter = true; |
1244 | muxer_msg_iter = g_new0(struct muxer_msg_iter, 1); | |
1245 | if (!muxer_msg_iter) { | |
d9693c64 | 1246 | BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator."); |
ab8b2b1b | 1247 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; |
ab11110e PP |
1248 | goto error; |
1249 | } | |
1250 | ||
f5abbab4 | 1251 | muxer_msg_iter->muxer_comp = muxer_comp; |
692f1a01 | 1252 | muxer_msg_iter->self_msg_iter = self_msg_iter; |
b09a5592 | 1253 | muxer_msg_iter->last_returned_ts_ns = INT64_MIN; |
a71ed05c | 1254 | muxer_msg_iter->active_muxer_upstream_msg_iters = |
958f7d11 | 1255 | g_ptr_array_new_with_free_func( |
b09a5592 | 1256 | (GDestroyNotify) destroy_muxer_upstream_msg_iter); |
a71ed05c | 1257 | if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { |
d9693c64 | 1258 | BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); |
ab8b2b1b | 1259 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; |
a71ed05c PP |
1260 | goto error; |
1261 | } | |
1262 | ||
1263 | muxer_msg_iter->ended_muxer_upstream_msg_iters = | |
1264 | g_ptr_array_new_with_free_func( | |
1265 | (GDestroyNotify) destroy_muxer_upstream_msg_iter); | |
1266 | if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { | |
d9693c64 | 1267 | BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); |
ab8b2b1b | 1268 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; |
958f7d11 PP |
1269 | goto error; |
1270 | } | |
1271 | ||
ab8b2b1b | 1272 | status = muxer_msg_iter_init_upstream_iterators(muxer_comp, |
b09a5592 | 1273 | muxer_msg_iter); |
ab8b2b1b | 1274 | if (status) { |
d9693c64 | 1275 | BT_COMP_LOGE("Cannot initialize connected input ports for muxer component's message iterator: " |
318fe670 | 1276 | "comp-addr=%p, muxer-comp-addr=%p, " |
b09a5592 PP |
1277 | "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d", |
1278 | self_comp, muxer_comp, muxer_msg_iter, | |
ab8b2b1b | 1279 | self_msg_iter, status); |
a09c6b95 PP |
1280 | goto error; |
1281 | } | |
1282 | ||
1043fdea | 1283 | bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter); |
d9693c64 | 1284 | BT_COMP_LOGD("Initialized muxer component's message iterator: " |
b09a5592 PP |
1285 | "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " |
1286 | "msg-iter-addr=%p", | |
1287 | self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); | |
958f7d11 PP |
1288 | goto end; |
1289 | ||
1290 | error: | |
b09a5592 | 1291 | destroy_muxer_msg_iter(muxer_msg_iter); |
1043fdea | 1292 | bt_self_message_iterator_set_data(self_msg_iter, NULL); |
958f7d11 PP |
1293 | |
1294 | end: | |
b09a5592 | 1295 | muxer_comp->initializing_muxer_msg_iter = false; |
958f7d11 PP |
1296 | return status; |
1297 | } | |
1298 | ||
1299 | BT_HIDDEN | |
a71ed05c | 1300 | void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) |
958f7d11 | 1301 | { |
b09a5592 PP |
1302 | struct muxer_msg_iter *muxer_msg_iter = |
1303 | bt_self_message_iterator_get_data(self_msg_iter); | |
8eee8ea2 | 1304 | bt_self_component *self_comp = NULL; |
958f7d11 PP |
1305 | struct muxer_comp *muxer_comp = NULL; |
1306 | ||
b09a5592 PP |
1307 | self_comp = bt_self_message_iterator_borrow_component( |
1308 | self_msg_iter); | |
834e9996 PP |
1309 | BT_ASSERT(self_comp); |
1310 | muxer_comp = bt_self_component_get_data(self_comp); | |
d9693c64 | 1311 | BT_COMP_LOGD("Finalizing muxer component's message iterator: " |
b09a5592 PP |
1312 | "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " |
1313 | "msg-iter-addr=%p", | |
1314 | self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); | |
958f7d11 | 1315 | |
1043fdea | 1316 | if (muxer_msg_iter) { |
b09a5592 | 1317 | destroy_muxer_msg_iter(muxer_msg_iter); |
958f7d11 | 1318 | } |
958f7d11 PP |
1319 | } |
1320 | ||
1321 | BT_HIDDEN | |
fb25b9e3 | 1322 | bt_component_class_message_iterator_next_method_status muxer_msg_iter_next( |
b09a5592 PP |
1323 | bt_self_message_iterator *self_msg_iter, |
1324 | bt_message_array_const msgs, uint64_t capacity, | |
3fd7b79d | 1325 | uint64_t *count) |
958f7d11 | 1326 | { |
fb25b9e3 | 1327 | bt_component_class_message_iterator_next_method_status status; |
b09a5592 PP |
1328 | struct muxer_msg_iter *muxer_msg_iter = |
1329 | bt_self_message_iterator_get_data(self_msg_iter); | |
8eee8ea2 | 1330 | bt_self_component *self_comp = NULL; |
958f7d11 | 1331 | struct muxer_comp *muxer_comp = NULL; |
958f7d11 | 1332 | |
b09a5592 PP |
1333 | BT_ASSERT(muxer_msg_iter); |
1334 | self_comp = bt_self_message_iterator_borrow_component( | |
1335 | self_msg_iter); | |
834e9996 PP |
1336 | BT_ASSERT(self_comp); |
1337 | muxer_comp = bt_self_component_get_data(self_comp); | |
8b45963b | 1338 | BT_ASSERT(muxer_comp); |
c9ecaa78 | 1339 | BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: " |
b09a5592 PP |
1340 | "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " |
1341 | "msg-iter-addr=%p", | |
1342 | self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); | |
318fe670 | 1343 | |
b09a5592 PP |
1344 | status = muxer_msg_iter_do_next(muxer_comp, muxer_msg_iter, |
1345 | msgs, capacity, count); | |
3fd7b79d | 1346 | if (status < 0) { |
d9693c64 | 1347 | BT_COMP_LOGE("Cannot get next message: " |
b09a5592 PP |
1348 | "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " |
1349 | "msg-iter-addr=%p, status=%s", | |
1350 | self_comp, muxer_comp, muxer_msg_iter, self_msg_iter, | |
fb25b9e3 | 1351 | bt_common_func_status_string(status)); |
318fe670 | 1352 | } else { |
c9ecaa78 | 1353 | BT_COMP_LOGT("Returning from muxer component's message iterator's \"next\" method: " |
3fd7b79d | 1354 | "status=%s", |
fb25b9e3 | 1355 | bt_common_func_status_string(status)); |
318fe670 | 1356 | } |
958f7d11 | 1357 | |
3fd7b79d | 1358 | return status; |
958f7d11 PP |
1359 | } |
1360 | ||
1361 | BT_HIDDEN | |
fb25b9e3 | 1362 | bt_component_class_port_connected_method_status muxer_input_port_connected( |
8eee8ea2 PP |
1363 | bt_self_component_filter *self_comp, |
1364 | bt_self_component_port_input *self_port, | |
1365 | const bt_port_output *other_port) | |
958f7d11 | 1366 | { |
fb25b9e3 PP |
1367 | bt_component_class_port_connected_method_status status = |
1368 | BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_OK; | |
1369 | bt_self_component_add_port_status add_port_status; | |
f5abbab4 PP |
1370 | struct muxer_comp *muxer_comp = bt_self_component_get_data( |
1371 | bt_self_component_filter_as_self_component(self_comp)); | |
958f7d11 | 1372 | |
fb25b9e3 PP |
1373 | add_port_status = add_available_input_port(self_comp); |
1374 | if (add_port_status) { | |
d9693c64 | 1375 | BT_COMP_LOGE("Cannot add one muxer component's input port: " |
1043fdea | 1376 | "status=%s", |
fb25b9e3 PP |
1377 | bt_common_func_status_string(status)); |
1378 | ||
1379 | if (add_port_status == | |
1380 | BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { | |
1381 | status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_MEMORY_ERROR; | |
1382 | } else { | |
1383 | status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_ERROR; | |
1384 | } | |
1385 | ||
06a2cb0d PP |
1386 | goto end; |
1387 | } | |
1388 | ||
958f7d11 | 1389 | end: |
634f394c | 1390 | return status; |
958f7d11 | 1391 | } |
c9d3ff42 | 1392 | |
a71ed05c | 1393 | static inline |
9e8e8b43 SM |
1394 | bt_component_class_message_iterator_can_seek_beginning_method_status |
1395 | muxer_upstream_msg_iters_can_all_seek_beginning( | |
1396 | GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek) | |
c9d3ff42 | 1397 | { |
2f355c38 PP |
1398 | bt_component_class_message_iterator_can_seek_beginning_method_status status = |
1399 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK; | |
c9d3ff42 | 1400 | uint64_t i; |
c9d3ff42 | 1401 | |
a71ed05c | 1402 | for (i = 0; i < muxer_upstream_msg_iters->len; i++) { |
c9d3ff42 | 1403 | struct muxer_upstream_msg_iter *upstream_msg_iter = |
a71ed05c | 1404 | muxer_upstream_msg_iters->pdata[i]; |
9e8e8b43 SM |
1405 | status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning( |
1406 | upstream_msg_iter->msg_iter, can_seek); | |
1407 | if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { | |
1408 | goto end; | |
1409 | } | |
c9d3ff42 | 1410 | |
9e8e8b43 | 1411 | if (!*can_seek) { |
c9d3ff42 PP |
1412 | goto end; |
1413 | } | |
1414 | } | |
1415 | ||
9e8e8b43 SM |
1416 | *can_seek = BT_TRUE; |
1417 | ||
c9d3ff42 | 1418 | end: |
9e8e8b43 | 1419 | return status; |
c9d3ff42 PP |
1420 | } |
1421 | ||
a71ed05c | 1422 | BT_HIDDEN |
9e8e8b43 SM |
1423 | bt_component_class_message_iterator_can_seek_beginning_method_status |
1424 | muxer_msg_iter_can_seek_beginning( | |
1425 | bt_self_message_iterator *self_msg_iter, bt_bool *can_seek) | |
a71ed05c PP |
1426 | { |
1427 | struct muxer_msg_iter *muxer_msg_iter = | |
1428 | bt_self_message_iterator_get_data(self_msg_iter); | |
9e8e8b43 | 1429 | bt_component_class_message_iterator_can_seek_beginning_method_status status; |
a71ed05c | 1430 | |
9e8e8b43 SM |
1431 | status = muxer_upstream_msg_iters_can_all_seek_beginning( |
1432 | muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek); | |
1433 | if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { | |
a71ed05c PP |
1434 | goto end; |
1435 | } | |
1436 | ||
9e8e8b43 | 1437 | if (!*can_seek) { |
a71ed05c PP |
1438 | goto end; |
1439 | } | |
1440 | ||
9e8e8b43 SM |
1441 | status = muxer_upstream_msg_iters_can_all_seek_beginning( |
1442 | muxer_msg_iter->ended_muxer_upstream_msg_iters, can_seek); | |
1443 | ||
a71ed05c | 1444 | end: |
9e8e8b43 | 1445 | return status; |
a71ed05c PP |
1446 | } |
1447 | ||
c9d3ff42 | 1448 | BT_HIDDEN |
fb25b9e3 | 1449 | bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_seek_beginning( |
c9d3ff42 PP |
1450 | bt_self_message_iterator *self_msg_iter) |
1451 | { | |
1452 | struct muxer_msg_iter *muxer_msg_iter = | |
1453 | bt_self_message_iterator_get_data(self_msg_iter); | |
fb25b9e3 PP |
1454 | bt_component_class_message_iterator_seek_beginning_method_status status = |
1455 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_STATUS_OK; | |
1456 | bt_message_iterator_seek_beginning_status seek_beg_status; | |
c9d3ff42 PP |
1457 | uint64_t i; |
1458 | ||
a71ed05c PP |
1459 | /* Seek all ended upstream iterators first */ |
1460 | for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len; | |
1461 | i++) { | |
c9d3ff42 | 1462 | struct muxer_upstream_msg_iter *upstream_msg_iter = |
a71ed05c | 1463 | muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; |
c9d3ff42 | 1464 | |
fb25b9e3 | 1465 | seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( |
c9d3ff42 | 1466 | upstream_msg_iter->msg_iter); |
fb25b9e3 PP |
1467 | if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { |
1468 | status = (int) seek_beg_status; | |
c9d3ff42 PP |
1469 | goto end; |
1470 | } | |
a71ed05c PP |
1471 | |
1472 | empty_message_queue(upstream_msg_iter); | |
1473 | } | |
1474 | ||
1475 | /* Seek all previously active upstream iterators */ | |
1476 | for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; | |
1477 | i++) { | |
1478 | struct muxer_upstream_msg_iter *upstream_msg_iter = | |
1479 | muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i]; | |
1480 | ||
fb25b9e3 | 1481 | seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( |
a71ed05c | 1482 | upstream_msg_iter->msg_iter); |
fb25b9e3 PP |
1483 | if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { |
1484 | status = (int) seek_beg_status; | |
a71ed05c PP |
1485 | goto end; |
1486 | } | |
1487 | ||
1488 | empty_message_queue(upstream_msg_iter); | |
1489 | } | |
1490 | ||
1491 | /* Make them all active */ | |
1492 | for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len; | |
1493 | i++) { | |
1494 | struct muxer_upstream_msg_iter *upstream_msg_iter = | |
1495 | muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; | |
1496 | ||
1497 | g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, | |
1498 | upstream_msg_iter); | |
1499 | muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL; | |
c9d3ff42 PP |
1500 | } |
1501 | ||
0e4ccfb9 MJ |
1502 | /* |
1503 | * GLib < 2.48.0 asserts when g_ptr_array_remove_range() is | |
1504 | * called on an empty array. | |
1505 | */ | |
1506 | if (muxer_msg_iter->ended_muxer_upstream_msg_iters->len > 0) { | |
1507 | g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters, | |
1508 | 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len); | |
1509 | } | |
c9d3ff42 | 1510 | muxer_msg_iter->last_returned_ts_ns = INT64_MIN; |
a71ed05c PP |
1511 | muxer_msg_iter->clock_class_expectation = |
1512 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY; | |
c9d3ff42 PP |
1513 | |
1514 | end: | |
fb25b9e3 | 1515 | return status; |
c9d3ff42 | 1516 | } |