Commit | Line | Data |
---|---|---|
958f7d11 PP |
1 | /* |
2 | * Copyright 2017 Philippe Proulx <pproulx@efficios.com> | |
3 | * | |
4 | * Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 | * of this software and associated documentation files (the "Software"), to deal | |
6 | * in the Software without restriction, including without limitation the rights | |
7 | * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 | * copies of the Software, and to permit persons to whom the Software is | |
9 | * furnished to do so, subject to the following conditions: | |
10 | * | |
11 | * The above copyright notice and this permission notice shall be included in | |
12 | * all copies or substantial portions of the Software. | |
13 | * | |
14 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 | * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
20 | * SOFTWARE. | |
21 | */ | |
22 | ||
d9693c64 | 23 | #define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp) |
f5abbab4 | 24 | #define BT_LOG_OUTPUT_LEVEL (muxer_comp->log_level) |
b03487ab | 25 | #define BT_LOG_TAG "PLUGIN/FLT.UTILS.MUXER" |
3fa1b6a3 | 26 | #include "logging/comp-logging.h" |
318fe670 | 27 | |
85e7137b | 28 | #include "common/macros.h" |
d126826c | 29 | #include "common/uuid.h" |
71c5da58 | 30 | #include <babeltrace2/babeltrace.h> |
958f7d11 | 31 | #include <glib.h> |
c55a9f58 | 32 | #include <stdbool.h> |
318fe670 | 33 | #include <inttypes.h> |
57952005 MJ |
34 | #include "common/assert.h" |
35 | #include "common/common.h" | |
0fbb9a9f | 36 | #include <stdlib.h> |
282c8cd0 | 37 | #include <string.h> |
958f7d11 | 38 | |
9240924b FD |
39 | #include "plugins/common/muxing/muxing.h" |
40 | ||
3e3ea13e FD |
41 | #include "muxer.h" |
42 | ||
958f7d11 | 43 | struct muxer_comp { |
d9693c64 PP |
44 | /* Weak refs */ |
45 | bt_self_component_filter *self_comp_flt; | |
46 | bt_self_component *self_comp; | |
834e9996 | 47 | |
958f7d11 PP |
48 | unsigned int next_port_num; |
49 | size_t available_input_ports; | |
b09a5592 | 50 | bool initializing_muxer_msg_iter; |
f5abbab4 | 51 | bt_logging_level log_level; |
958f7d11 PP |
52 | }; |
53 | ||
b09a5592 | 54 | struct muxer_upstream_msg_iter { |
f5abbab4 PP |
55 | struct muxer_comp *muxer_comp; |
56 | ||
ab11110e | 57 | /* Owned by this, NULL if ended */ |
b09a5592 | 58 | bt_self_component_port_input_message_iterator *msg_iter; |
958f7d11 | 59 | |
b09a5592 PP |
60 | /* Contains `const bt_message *`, owned by this */ |
61 | GQueue *msgs; | |
958f7d11 PP |
62 | }; |
63 | ||
b09a5592 PP |
64 | enum muxer_msg_iter_clock_class_expectation { |
65 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0, | |
37911c11 | 66 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE, |
b09a5592 PP |
67 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE, |
68 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID, | |
69 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID, | |
282c8cd0 PP |
70 | }; |
71 | ||
b09a5592 | 72 | struct muxer_msg_iter { |
f5abbab4 PP |
73 | struct muxer_comp *muxer_comp; |
74 | ||
692f1a01 PP |
75 | /* Weak */ |
76 | bt_self_message_iterator *self_msg_iter; | |
77 | ||
ab11110e | 78 | /* |
b09a5592 | 79 | * Array of struct muxer_upstream_msg_iter * (owned by this). |
ab11110e PP |
80 | * |
81 | * NOTE: This array is searched in linearly to find the youngest | |
b09a5592 | 82 | * current message. Keep this until benchmarks confirm that |
ab11110e PP |
83 | * another data structure is faster than this for our typical |
84 | * use cases. | |
85 | */ | |
a71ed05c PP |
86 | GPtrArray *active_muxer_upstream_msg_iters; |
87 | ||
88 | /* | |
89 | * Array of struct muxer_upstream_msg_iter * (owned by this). | |
90 | * | |
91 | * We move ended message iterators from | |
92 | * `active_muxer_upstream_msg_iters` to this array so as to be | |
93 | * able to restore them when seeking. | |
94 | */ | |
95 | GPtrArray *ended_muxer_upstream_msg_iters; | |
958f7d11 | 96 | |
b09a5592 | 97 | /* Last time returned in a message */ |
958f7d11 | 98 | int64_t last_returned_ts_ns; |
282c8cd0 PP |
99 | |
100 | /* Clock class expectation state */ | |
b09a5592 | 101 | enum muxer_msg_iter_clock_class_expectation clock_class_expectation; |
282c8cd0 PP |
102 | |
103 | /* | |
104 | * Expected clock class UUID, only valid when | |
105 | * clock_class_expectation is | |
b09a5592 | 106 | * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID. |
282c8cd0 | 107 | */ |
d126826c | 108 | bt_uuid_t expected_clock_class_uuid; |
958f7d11 PP |
109 | }; |
110 | ||
a71ed05c PP |
111 | static |
112 | void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter) | |
113 | { | |
114 | const bt_message *msg; | |
115 | ||
116 | while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) { | |
117 | bt_message_put_ref(msg); | |
118 | } | |
119 | } | |
120 | ||
ab11110e | 121 | static |
b09a5592 PP |
122 | void destroy_muxer_upstream_msg_iter( |
123 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) | |
ab11110e | 124 | { |
f5abbab4 PP |
125 | struct muxer_comp *muxer_comp; |
126 | ||
b09a5592 | 127 | if (!muxer_upstream_msg_iter) { |
ab11110e PP |
128 | return; |
129 | } | |
130 | ||
f5abbab4 | 131 | muxer_comp = muxer_upstream_msg_iter->muxer_comp; |
d9693c64 | 132 | BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: " |
b09a5592 PP |
133 | "addr=%p, msg-iter-addr=%p, queue-len=%u", |
134 | muxer_upstream_msg_iter, | |
135 | muxer_upstream_msg_iter->msg_iter, | |
136 | muxer_upstream_msg_iter->msgs->length); | |
a71ed05c PP |
137 | bt_self_component_port_input_message_iterator_put_ref( |
138 | muxer_upstream_msg_iter->msg_iter); | |
3fd7b79d | 139 | |
b09a5592 | 140 | if (muxer_upstream_msg_iter->msgs) { |
a71ed05c | 141 | empty_message_queue(muxer_upstream_msg_iter); |
b09a5592 | 142 | g_queue_free(muxer_upstream_msg_iter->msgs); |
3fd7b79d PP |
143 | } |
144 | ||
b09a5592 | 145 | g_free(muxer_upstream_msg_iter); |
ab11110e PP |
146 | } |
147 | ||
958f7d11 | 148 | static |
ae644e59 | 149 | int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter, |
b09a5592 | 150 | bt_self_component_port_input_message_iterator *self_msg_iter) |
958f7d11 | 151 | { |
ae644e59 | 152 | int ret = 0; |
b09a5592 PP |
153 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = |
154 | g_new0(struct muxer_upstream_msg_iter, 1); | |
f5abbab4 | 155 | struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; |
958f7d11 | 156 | |
b09a5592 | 157 | if (!muxer_upstream_msg_iter) { |
d9693c64 | 158 | BT_COMP_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper."); |
11603bce | 159 | goto error; |
958f7d11 PP |
160 | } |
161 | ||
f5abbab4 | 162 | muxer_upstream_msg_iter->muxer_comp = muxer_comp; |
b09a5592 PP |
163 | muxer_upstream_msg_iter->msg_iter = self_msg_iter; |
164 | bt_self_component_port_input_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter); | |
165 | muxer_upstream_msg_iter->msgs = g_queue_new(); | |
166 | if (!muxer_upstream_msg_iter->msgs) { | |
d9693c64 | 167 | BT_COMP_LOGE_STR("Failed to allocate a GQueue."); |
11603bce | 168 | goto error; |
3fd7b79d PP |
169 | } |
170 | ||
a71ed05c | 171 | g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, |
b09a5592 | 172 | muxer_upstream_msg_iter); |
d9693c64 | 173 | BT_COMP_LOGD("Added muxer's upstream message iterator wrapper: " |
b09a5592 PP |
174 | "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p", |
175 | muxer_upstream_msg_iter, muxer_msg_iter, | |
176 | self_msg_iter); | |
958f7d11 | 177 | |
11603bce FD |
178 | goto end; |
179 | ||
180 | error: | |
181 | g_free(muxer_upstream_msg_iter); | |
ae644e59 | 182 | ret = -1; |
11603bce | 183 | |
958f7d11 | 184 | end: |
ae644e59 | 185 | return ret; |
958f7d11 PP |
186 | } |
187 | ||
958f7d11 | 188 | static |
fb25b9e3 | 189 | bt_self_component_add_port_status add_available_input_port( |
8eee8ea2 | 190 | bt_self_component_filter *self_comp) |
958f7d11 | 191 | { |
834e9996 | 192 | struct muxer_comp *muxer_comp = bt_self_component_get_data( |
bb61965b | 193 | bt_self_component_filter_as_self_component(self_comp)); |
fb25b9e3 PP |
194 | bt_self_component_add_port_status status = |
195 | BT_SELF_COMPONENT_ADD_PORT_STATUS_OK; | |
958f7d11 | 196 | GString *port_name = NULL; |
958f7d11 | 197 | |
8b45963b | 198 | BT_ASSERT(muxer_comp); |
958f7d11 PP |
199 | port_name = g_string_new("in"); |
200 | if (!port_name) { | |
d9693c64 | 201 | BT_COMP_LOGE_STR("Failed to allocate a GString."); |
fb25b9e3 | 202 | status = BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR; |
958f7d11 PP |
203 | goto end; |
204 | } | |
205 | ||
206 | g_string_append_printf(port_name, "%u", muxer_comp->next_port_num); | |
834e9996 PP |
207 | status = bt_self_component_filter_add_input_port( |
208 | self_comp, port_name->str, NULL, NULL); | |
fb25b9e3 | 209 | if (status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { |
d9693c64 | 210 | BT_COMP_LOGE("Cannot add input port to muxer component: " |
318fe670 | 211 | "port-name=\"%s\", comp-addr=%p, status=%s", |
834e9996 | 212 | port_name->str, self_comp, |
fb25b9e3 | 213 | bt_common_func_status_string(status)); |
958f7d11 PP |
214 | goto end; |
215 | } | |
216 | ||
217 | muxer_comp->available_input_ports++; | |
218 | muxer_comp->next_port_num++; | |
d9693c64 | 219 | BT_COMP_LOGI("Added one input port to muxer component: " |
318fe670 | 220 | "port-name=\"%s\", comp-addr=%p", |
834e9996 | 221 | port_name->str, self_comp); |
1043fdea | 222 | |
958f7d11 PP |
223 | end: |
224 | if (port_name) { | |
225 | g_string_free(port_name, TRUE); | |
226 | } | |
227 | ||
147337a3 | 228 | return status; |
958f7d11 PP |
229 | } |
230 | ||
958f7d11 | 231 | static |
fb25b9e3 | 232 | bt_self_component_add_port_status create_output_port( |
8eee8ea2 | 233 | bt_self_component_filter *self_comp) |
958f7d11 | 234 | { |
834e9996 PP |
235 | return bt_self_component_filter_add_output_port( |
236 | self_comp, "out", NULL, NULL); | |
958f7d11 PP |
237 | } |
238 | ||
239 | static | |
240 | void destroy_muxer_comp(struct muxer_comp *muxer_comp) | |
241 | { | |
242 | if (!muxer_comp) { | |
243 | return; | |
244 | } | |
245 | ||
958f7d11 PP |
246 | g_free(muxer_comp); |
247 | } | |
248 | ||
249 | BT_HIDDEN | |
4175c1d5 | 250 | bt_component_class_initialize_method_status muxer_init( |
d9693c64 | 251 | bt_self_component_filter *self_comp_flt, |
e3250e61 | 252 | bt_self_component_filter_configuration *config, |
3e3ea13e | 253 | const bt_value *params, void *init_data) |
958f7d11 | 254 | { |
4175c1d5 FD |
255 | bt_component_class_initialize_method_status status = |
256 | BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; | |
fb25b9e3 | 257 | bt_self_component_add_port_status add_port_status; |
d9693c64 PP |
258 | bt_self_component *self_comp = |
259 | bt_self_component_filter_as_self_component(self_comp_flt); | |
958f7d11 | 260 | struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1); |
f5abbab4 | 261 | bt_logging_level log_level = bt_component_get_logging_level( |
d9693c64 | 262 | bt_self_component_as_component(self_comp)); |
958f7d11 | 263 | |
d9693c64 | 264 | BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp, |
f5abbab4 | 265 | "Initializing muxer component: " |
834e9996 | 266 | "comp-addr=%p, params-addr=%p", self_comp, params); |
318fe670 | 267 | |
958f7d11 | 268 | if (!muxer_comp) { |
d9693c64 | 269 | BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp, |
f5abbab4 | 270 | "Failed to allocate one muxer component."); |
958f7d11 PP |
271 | goto error; |
272 | } | |
273 | ||
f5abbab4 | 274 | muxer_comp->log_level = log_level; |
d9693c64 PP |
275 | muxer_comp->self_comp = self_comp; |
276 | muxer_comp->self_comp_flt = self_comp_flt; | |
65ee897d | 277 | |
d9693c64 | 278 | bt_self_component_set_data(self_comp, muxer_comp); |
fb25b9e3 PP |
279 | add_port_status = add_available_input_port(self_comp_flt); |
280 | if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { | |
d9693c64 | 281 | BT_COMP_LOGE("Cannot ensure that at least one muxer component's input port is available: " |
318fe670 PP |
282 | "muxer-comp-addr=%p, status=%s", |
283 | muxer_comp, | |
fb25b9e3 PP |
284 | bt_common_func_status_string(add_port_status)); |
285 | if (add_port_status == | |
286 | BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { | |
4175c1d5 | 287 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; |
fb25b9e3 | 288 | } else { |
4175c1d5 | 289 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; |
fb25b9e3 PP |
290 | } |
291 | ||
958f7d11 PP |
292 | goto error; |
293 | } | |
294 | ||
fb25b9e3 PP |
295 | add_port_status = create_output_port(self_comp_flt); |
296 | if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { | |
d9693c64 | 297 | BT_COMP_LOGE("Cannot create muxer component's output port: " |
318fe670 PP |
298 | "muxer-comp-addr=%p, status=%s", |
299 | muxer_comp, | |
fb25b9e3 PP |
300 | bt_common_func_status_string(add_port_status)); |
301 | if (add_port_status == | |
302 | BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { | |
4175c1d5 | 303 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; |
fb25b9e3 | 304 | } else { |
4175c1d5 | 305 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; |
fb25b9e3 PP |
306 | } |
307 | ||
958f7d11 PP |
308 | goto error; |
309 | } | |
310 | ||
d9693c64 | 311 | BT_COMP_LOGI("Initialized muxer component: " |
318fe670 | 312 | "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p", |
834e9996 | 313 | self_comp, params, muxer_comp); |
318fe670 | 314 | |
958f7d11 PP |
315 | goto end; |
316 | ||
317 | error: | |
318 | destroy_muxer_comp(muxer_comp); | |
d9693c64 | 319 | bt_self_component_set_data(self_comp, NULL); |
147337a3 | 320 | |
4175c1d5 FD |
321 | if (status == BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { |
322 | status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; | |
147337a3 | 323 | } |
958f7d11 PP |
324 | |
325 | end: | |
326 | return status; | |
327 | } | |
328 | ||
329 | BT_HIDDEN | |
8eee8ea2 | 330 | void muxer_finalize(bt_self_component_filter *self_comp) |
958f7d11 | 331 | { |
834e9996 | 332 | struct muxer_comp *muxer_comp = bt_self_component_get_data( |
bb61965b | 333 | bt_self_component_filter_as_self_component(self_comp)); |
958f7d11 | 334 | |
d9693c64 | 335 | BT_COMP_LOGI("Finalizing muxer component: comp-addr=%p", |
834e9996 | 336 | self_comp); |
958f7d11 PP |
337 | destroy_muxer_comp(muxer_comp); |
338 | } | |
339 | ||
340 | static | |
ab8b2b1b | 341 | bt_self_component_port_input_message_iterator_create_from_message_iterator_status |
f5abbab4 | 342 | create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, |
692f1a01 | 343 | struct muxer_msg_iter *muxer_msg_iter, |
ab8b2b1b SM |
344 | bt_self_component_port_input *self_port, |
345 | bt_self_component_port_input_message_iterator **msg_iter) | |
958f7d11 | 346 | { |
8eee8ea2 | 347 | const bt_port *port = bt_self_component_port_as_port( |
bb61965b | 348 | bt_self_component_port_input_as_self_component_port( |
834e9996 | 349 | self_port)); |
ab8b2b1b SM |
350 | bt_self_component_port_input_message_iterator_create_from_message_iterator_status |
351 | status; | |
958f7d11 | 352 | |
8b45963b PP |
353 | BT_ASSERT(port); |
354 | BT_ASSERT(bt_port_is_connected(port)); | |
958f7d11 | 355 | |
ab11110e | 356 | // TODO: Advance the iterator to >= the time of the latest |
b09a5592 | 357 | // returned message by the muxer message |
ab11110e | 358 | // iterator which creates it. |
ab8b2b1b SM |
359 | status = bt_self_component_port_input_message_iterator_create_from_message_iterator( |
360 | muxer_msg_iter->self_msg_iter, self_port, msg_iter); | |
361 | if (status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { | |
d9693c64 | 362 | BT_COMP_LOGE("Cannot create upstream message iterator on input port: " |
834e9996 PP |
363 | "port-addr=%p, port-name=\"%s\"", |
364 | port, bt_port_get_name(port)); | |
958f7d11 PP |
365 | goto end; |
366 | } | |
367 | ||
d9693c64 | 368 | BT_COMP_LOGI("Created upstream message iterator on input port: " |
b09a5592 PP |
369 | "port-addr=%p, port-name=\"%s\", msg-iter-addr=%p", |
370 | port, bt_port_get_name(port), msg_iter); | |
318fe670 | 371 | |
958f7d11 | 372 | end: |
ab8b2b1b | 373 | return status; |
958f7d11 PP |
374 | } |
375 | ||
ab11110e | 376 | static |
fb25b9e3 | 377 | bt_component_class_message_iterator_next_method_status muxer_upstream_msg_iter_next( |
a71ed05c PP |
378 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, |
379 | bool *is_ended) | |
ab11110e | 380 | { |
f5abbab4 PP |
381 | struct muxer_comp *muxer_comp = |
382 | muxer_upstream_msg_iter->muxer_comp; | |
fb25b9e3 PP |
383 | bt_component_class_message_iterator_next_method_status status; |
384 | bt_message_iterator_next_status input_port_iter_status; | |
b09a5592 | 385 | bt_message_array_const msgs; |
3fd7b79d PP |
386 | uint64_t i; |
387 | uint64_t count; | |
ab11110e | 388 | |
d9693c64 | 389 | BT_COMP_LOGD("Calling upstream message iterator's \"next\" method: " |
b09a5592 PP |
390 | "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p", |
391 | muxer_upstream_msg_iter, | |
392 | muxer_upstream_msg_iter->msg_iter); | |
3e3ea13e | 393 | input_port_iter_status = bt_self_component_port_input_message_iterator_next( |
b09a5592 | 394 | muxer_upstream_msg_iter->msg_iter, &msgs, &count); |
d9693c64 | 395 | BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: " |
fb25b9e3 PP |
396 | "status=%s", |
397 | bt_common_func_status_string(input_port_iter_status)); | |
ab11110e | 398 | |
3e3ea13e | 399 | switch (input_port_iter_status) { |
fb25b9e3 | 400 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK: |
089717de | 401 | /* |
b09a5592 | 402 | * Message iterator's current message is |
3fd7b79d | 403 | * valid: it must be considered for muxing operations. |
089717de | 404 | */ |
d9693c64 | 405 | BT_COMP_LOGD_STR("Validated upstream message iterator wrapper."); |
3fd7b79d PP |
406 | BT_ASSERT(count > 0); |
407 | ||
b09a5592 | 408 | /* Move messages to our queue */ |
3fd7b79d PP |
409 | for (i = 0; i < count; i++) { |
410 | /* | |
411 | * Push to tail in order; other side | |
b09a5592 | 412 | * (muxer_msg_iter_do_next_one()) consumes |
3fd7b79d PP |
413 | * from the head first. |
414 | */ | |
b09a5592 PP |
415 | g_queue_push_tail(muxer_upstream_msg_iter->msgs, |
416 | (void *) msgs[i]); | |
3fd7b79d | 417 | } |
fb25b9e3 | 418 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; |
ab11110e | 419 | break; |
fb25b9e3 | 420 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: |
089717de | 421 | /* |
b09a5592 | 422 | * Message iterator's current message is not |
089717de | 423 | * valid anymore. Return |
fb25b9e3 | 424 | * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately. |
089717de | 425 | */ |
fb25b9e3 | 426 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; |
ab11110e | 427 | break; |
fb25b9e3 | 428 | case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* Fall-through. */ |
ab11110e | 429 | /* |
b09a5592 | 430 | * Message iterator reached the end: release it. It |
ab11110e | 431 | * won't be considered again to find the youngest |
b09a5592 | 432 | * message. |
ab11110e | 433 | */ |
a71ed05c | 434 | *is_ended = true; |
fb25b9e3 | 435 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; |
089717de | 436 | break; |
ab11110e PP |
437 | default: |
438 | /* Error or unsupported status code */ | |
d9693c64 | 439 | BT_COMP_LOGE("Error or unsupported status code: " |
3e3ea13e | 440 | "status-code=%d", input_port_iter_status); |
fb25b9e3 | 441 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
089717de | 442 | break; |
ab11110e PP |
443 | } |
444 | ||
089717de | 445 | return status; |
ab11110e PP |
446 | } |
447 | ||
958f7d11 | 448 | static |
b09a5592 PP |
449 | int get_msg_ts_ns(struct muxer_comp *muxer_comp, |
450 | struct muxer_msg_iter *muxer_msg_iter, | |
451 | const bt_message *msg, int64_t last_returned_ts_ns, | |
958f7d11 PP |
452 | int64_t *ts_ns) |
453 | { | |
ecbb78c0 | 454 | const bt_clock_snapshot *clock_snapshot = NULL; |
958f7d11 | 455 | int ret = 0; |
34e13c22 PP |
456 | const bt_stream_class *stream_class = NULL; |
457 | bt_message_type msg_type; | |
958f7d11 | 458 | |
b09a5592 | 459 | BT_ASSERT(msg); |
8b45963b | 460 | BT_ASSERT(ts_ns); |
d9693c64 | 461 | BT_COMP_LOGD("Getting message's timestamp: " |
b09a5592 | 462 | "muxer-msg-iter-addr=%p, msg-addr=%p, " |
318fe670 | 463 | "last-returned-ts=%" PRId64, |
b09a5592 | 464 | muxer_msg_iter, msg, last_returned_ts_ns); |
318fe670 | 465 | |
85e7137b | 466 | if (G_UNLIKELY(muxer_msg_iter->clock_class_expectation == |
37911c11 PP |
467 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) { |
468 | *ts_ns = last_returned_ts_ns; | |
469 | goto end; | |
470 | } | |
471 | ||
34e13c22 PP |
472 | msg_type = bt_message_get_type(msg); |
473 | ||
85e7137b | 474 | if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) { |
34e13c22 PP |
475 | stream_class = bt_stream_borrow_class_const( |
476 | bt_packet_borrow_stream_const( | |
477 | bt_message_packet_beginning_borrow_packet_const( | |
478 | msg))); | |
85e7137b | 479 | } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_END)) { |
34e13c22 PP |
480 | stream_class = bt_stream_borrow_class_const( |
481 | bt_packet_borrow_stream_const( | |
482 | bt_message_packet_end_borrow_packet_const( | |
483 | msg))); | |
85e7137b | 484 | } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) { |
77037b2b PP |
485 | stream_class = bt_stream_borrow_class_const( |
486 | bt_message_discarded_events_borrow_stream_const(msg)); | |
85e7137b | 487 | } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) { |
77037b2b PP |
488 | stream_class = bt_stream_borrow_class_const( |
489 | bt_message_discarded_packets_borrow_stream_const(msg)); | |
34e13c22 PP |
490 | } |
491 | ||
492 | switch (msg_type) { | |
b09a5592 | 493 | case BT_MESSAGE_TYPE_EVENT: |
37911c11 PP |
494 | BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const( |
495 | msg)); | |
11ddb3ef PP |
496 | clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const( |
497 | msg); | |
958f7d11 | 498 | break; |
4a4dd64f | 499 | case BT_MESSAGE_TYPE_PACKET_BEGINNING: |
5ef34326 | 500 | if (bt_stream_class_packets_have_beginning_default_clock_snapshot( |
34e13c22 PP |
501 | stream_class)) { |
502 | clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const( | |
503 | msg); | |
504 | } else { | |
505 | goto no_clock_snapshot; | |
506 | } | |
507 | ||
4a4dd64f PP |
508 | break; |
509 | case BT_MESSAGE_TYPE_PACKET_END: | |
5ef34326 | 510 | if (bt_stream_class_packets_have_end_default_clock_snapshot( |
34e13c22 PP |
511 | stream_class)) { |
512 | clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const( | |
513 | msg); | |
514 | } else { | |
515 | goto no_clock_snapshot; | |
516 | } | |
517 | ||
4a4dd64f | 518 | break; |
3ce73327 FD |
519 | case BT_MESSAGE_TYPE_STREAM_BEGINNING: |
520 | { | |
521 | enum bt_message_stream_clock_snapshot_state snapshot_state = | |
522 | bt_message_stream_beginning_borrow_default_clock_snapshot_const( | |
523 | msg, &clock_snapshot); | |
524 | if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) { | |
525 | goto no_clock_snapshot; | |
526 | } | |
527 | ||
528 | break; | |
529 | } | |
530 | case BT_MESSAGE_TYPE_STREAM_END: | |
531 | { | |
532 | enum bt_message_stream_clock_snapshot_state snapshot_state = | |
533 | bt_message_stream_end_borrow_default_clock_snapshot_const( | |
534 | msg, &clock_snapshot); | |
535 | if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) { | |
536 | goto no_clock_snapshot; | |
537 | } | |
538 | ||
539 | break; | |
540 | } | |
4a4dd64f | 541 | case BT_MESSAGE_TYPE_DISCARDED_EVENTS: |
77037b2b PP |
542 | if (bt_stream_class_discarded_events_have_default_clock_snapshots( |
543 | stream_class)) { | |
5ef34326 | 544 | clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const( |
77037b2b PP |
545 | msg); |
546 | } else { | |
547 | goto no_clock_snapshot; | |
548 | } | |
549 | ||
4a4dd64f PP |
550 | break; |
551 | case BT_MESSAGE_TYPE_DISCARDED_PACKETS: | |
77037b2b PP |
552 | if (bt_stream_class_discarded_packets_have_default_clock_snapshots( |
553 | stream_class)) { | |
5ef34326 | 554 | clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const( |
77037b2b PP |
555 | msg); |
556 | } else { | |
557 | goto no_clock_snapshot; | |
558 | } | |
559 | ||
4a4dd64f | 560 | break; |
40bf6fd0 | 561 | case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: |
11ddb3ef PP |
562 | clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( |
563 | msg); | |
958f7d11 PP |
564 | break; |
565 | default: | |
b09a5592 | 566 | /* All the other messages have a higher priority */ |
d9693c64 | 567 | BT_COMP_LOGD_STR("Message has no timestamp: using the last returned timestamp."); |
958f7d11 PP |
568 | *ts_ns = last_returned_ts_ns; |
569 | goto end; | |
570 | } | |
571 | ||
37911c11 PP |
572 | ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); |
573 | if (ret) { | |
d9693c64 | 574 | BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " |
37911c11 PP |
575 | "clock-snapshot-addr=%p", clock_snapshot); |
576 | goto error; | |
577 | } | |
578 | ||
579 | goto end; | |
580 | ||
581 | no_clock_snapshot: | |
d9693c64 | 582 | BT_COMP_LOGD_STR("Message's default clock snapshot is missing: " |
37911c11 PP |
583 | "using the last returned timestamp."); |
584 | *ts_ns = last_returned_ts_ns; | |
585 | goto end; | |
586 | ||
587 | error: | |
588 | ret = -1; | |
589 | ||
590 | end: | |
591 | if (ret == 0) { | |
d9693c64 | 592 | BT_COMP_LOGD("Found message's timestamp: " |
37911c11 PP |
593 | "muxer-msg-iter-addr=%p, msg-addr=%p, " |
594 | "last-returned-ts=%" PRId64 ", ts=%" PRId64, | |
595 | muxer_msg_iter, msg, last_returned_ts_ns, | |
596 | *ts_ns); | |
7b33a0e0 PP |
597 | } |
598 | ||
37911c11 PP |
599 | return ret; |
600 | } | |
601 | ||
602 | static inline | |
603 | int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, | |
604 | struct muxer_comp *muxer_comp, | |
605 | const bt_clock_class *clock_class) | |
606 | { | |
607 | int ret = 0; | |
d126826c | 608 | const uint8_t *cc_uuid; |
37911c11 PP |
609 | const char *cc_name; |
610 | ||
8fc063a2 | 611 | BT_ASSERT(clock_class); |
839d52a5 PP |
612 | cc_uuid = bt_clock_class_get_uuid(clock_class); |
613 | cc_name = bt_clock_class_get_name(clock_class); | |
282c8cd0 | 614 | |
b09a5592 PP |
615 | if (muxer_msg_iter->clock_class_expectation == |
616 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) { | |
282c8cd0 | 617 | /* |
9635b8a6 FD |
618 | * This is the first clock class that this muxer message |
619 | * iterator encounters. Its properties determine what to expect | |
620 | * for the whole lifetime of the iterator. | |
282c8cd0 | 621 | */ |
d608d675 | 622 | if (bt_clock_class_origin_is_unix_epoch(clock_class)) { |
282c8cd0 | 623 | /* Expect absolute clock classes */ |
b09a5592 PP |
624 | muxer_msg_iter->clock_class_expectation = |
625 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE; | |
282c8cd0 PP |
626 | } else { |
627 | if (cc_uuid) { | |
628 | /* | |
629 | * Expect non-absolute clock classes | |
630 | * with a specific UUID. | |
631 | */ | |
b09a5592 PP |
632 | muxer_msg_iter->clock_class_expectation = |
633 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID; | |
d126826c | 634 | bt_uuid_copy(muxer_msg_iter->expected_clock_class_uuid, cc_uuid); |
282c8cd0 PP |
635 | } else { |
636 | /* | |
637 | * Expect non-absolute clock classes | |
638 | * with no UUID. | |
639 | */ | |
b09a5592 PP |
640 | muxer_msg_iter->clock_class_expectation = |
641 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID; | |
282c8cd0 PP |
642 | } |
643 | } | |
644 | } | |
645 | ||
9635b8a6 FD |
646 | switch (muxer_msg_iter->clock_class_expectation) { |
647 | case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE: | |
648 | if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { | |
649 | BT_COMP_LOGE("Expecting an absolute clock class, " | |
650 | "but got a non-absolute one: " | |
651 | "clock-class-addr=%p, clock-class-name=\"%s\"", | |
652 | clock_class, cc_name); | |
653 | goto error; | |
654 | } | |
655 | break; | |
656 | case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID: | |
657 | if (bt_clock_class_origin_is_unix_epoch(clock_class)) { | |
658 | BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, " | |
659 | "but got an absolute one: " | |
660 | "clock-class-addr=%p, clock-class-name=\"%s\"", | |
661 | clock_class, cc_name); | |
662 | goto error; | |
663 | } | |
282c8cd0 | 664 | |
9635b8a6 FD |
665 | if (cc_uuid) { |
666 | BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, " | |
667 | "but got one with a UUID: " | |
668 | "clock-class-addr=%p, clock-class-name=\"%s\", " | |
669 | "uuid=\"" BT_UUID_FMT "\"", | |
670 | clock_class, cc_name, BT_UUID_FMT_VALUES(cc_uuid)); | |
671 | goto error; | |
672 | } | |
673 | break; | |
674 | case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID: | |
675 | if (bt_clock_class_origin_is_unix_epoch(clock_class)) { | |
676 | BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " | |
677 | "but got an absolute one: " | |
678 | "clock-class-addr=%p, clock-class-name=\"%s\"", | |
679 | clock_class, cc_name); | |
680 | goto error; | |
681 | } | |
282c8cd0 | 682 | |
9635b8a6 FD |
683 | if (!cc_uuid) { |
684 | BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " | |
685 | "but got one with no UUID: " | |
37911c11 PP |
686 | "clock-class-addr=%p, clock-class-name=\"%s\"", |
687 | clock_class, cc_name); | |
688 | goto error; | |
282c8cd0 | 689 | } |
9635b8a6 FD |
690 | |
691 | if (bt_uuid_compare(muxer_msg_iter->expected_clock_class_uuid, cc_uuid) != 0) { | |
692 | BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, " | |
693 | "but got one with different UUID: " | |
694 | "clock-class-addr=%p, clock-class-name=\"%s\", " | |
695 | "expected-uuid=\"" BT_UUID_FMT "\", " | |
696 | "uuid=\"" BT_UUID_FMT "\"", | |
697 | clock_class, cc_name, | |
698 | BT_UUID_FMT_VALUES(muxer_msg_iter->expected_clock_class_uuid), | |
699 | BT_UUID_FMT_VALUES(cc_uuid)); | |
700 | goto error; | |
701 | } | |
702 | break; | |
703 | case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE: | |
704 | BT_COMP_LOGE("Expecting no clock class, but got one: " | |
705 | "clock-class-addr=%p, clock-class-name=\"%s\"", | |
706 | clock_class, cc_name); | |
707 | goto error; | |
708 | default: | |
709 | /* Unexpected */ | |
710 | BT_COMP_LOGF("Unexpected clock class expectation: " | |
711 | "expectation-code=%d", | |
712 | muxer_msg_iter->clock_class_expectation); | |
713 | abort(); | |
958f7d11 PP |
714 | } |
715 | ||
2d42927b PP |
716 | goto end; |
717 | ||
958f7d11 PP |
718 | error: |
719 | ret = -1; | |
720 | ||
721 | end: | |
37911c11 PP |
722 | return ret; |
723 | } | |
724 | ||
725 | static inline | |
726 | int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter, | |
727 | struct muxer_comp *muxer_comp, const bt_stream *stream) | |
728 | { | |
729 | int ret = 0; | |
730 | const bt_stream_class *stream_class = | |
731 | bt_stream_borrow_class_const(stream); | |
732 | const bt_clock_class *clock_class = | |
733 | bt_stream_class_borrow_default_clock_class_const(stream_class); | |
734 | ||
735 | if (!clock_class) { | |
736 | if (muxer_msg_iter->clock_class_expectation == | |
737 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) { | |
738 | /* Expect no clock class */ | |
739 | muxer_msg_iter->clock_class_expectation = | |
740 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE; | |
150a034f SM |
741 | } else if (muxer_msg_iter->clock_class_expectation != |
742 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE) { | |
743 | BT_COMP_LOGE("Expecting stream class without a default clock class: " | |
37911c11 PP |
744 | "stream-class-addr=%p, stream-class-name=\"%s\", " |
745 | "stream-class-id=%" PRIu64, | |
746 | stream_class, bt_stream_class_get_name(stream_class), | |
747 | bt_stream_class_get_id(stream_class)); | |
748 | ret = -1; | |
749 | } | |
750 | ||
751 | goto end; | |
318fe670 PP |
752 | } |
753 | ||
37911c11 PP |
754 | ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class); |
755 | ||
756 | end: | |
958f7d11 PP |
757 | return ret; |
758 | } | |
759 | ||
ab11110e | 760 | /* |
b09a5592 PP |
761 | * This function finds the youngest available message amongst the |
762 | * non-ended upstream message iterators and returns the upstream | |
763 | * message iterator which has it, or | |
764 | * BT_MESSAGE_ITERATOR_STATUS_END if there's no available | |
765 | * message. | |
ab11110e PP |
766 | * |
767 | * This function does NOT: | |
768 | * | |
b09a5592 | 769 | * * Update any upstream message iterator. |
b09a5592 | 770 | * * Check the upstream message iterators to retry. |
ab11110e | 771 | * |
b09a5592 PP |
772 | * On sucess, this function sets *muxer_upstream_msg_iter to the |
773 | * upstream message iterator of which the current message is | |
ab11110e PP |
774 | * the youngest, and sets *ts_ns to its time. |
775 | */ | |
958f7d11 | 776 | static |
fb25b9e3 | 777 | bt_component_class_message_iterator_next_method_status |
b09a5592 | 778 | muxer_msg_iter_youngest_upstream_msg_iter( |
958f7d11 | 779 | struct muxer_comp *muxer_comp, |
b09a5592 PP |
780 | struct muxer_msg_iter *muxer_msg_iter, |
781 | struct muxer_upstream_msg_iter **muxer_upstream_msg_iter, | |
958f7d11 PP |
782 | int64_t *ts_ns) |
783 | { | |
784 | size_t i; | |
785 | int ret; | |
786 | int64_t youngest_ts_ns = INT64_MAX; | |
fb25b9e3 PP |
787 | bt_component_class_message_iterator_next_method_status status = |
788 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; | |
958f7d11 | 789 | |
8b45963b | 790 | BT_ASSERT(muxer_comp); |
b09a5592 PP |
791 | BT_ASSERT(muxer_msg_iter); |
792 | BT_ASSERT(muxer_upstream_msg_iter); | |
793 | *muxer_upstream_msg_iter = NULL; | |
794 | ||
a71ed05c PP |
795 | for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; |
796 | i++) { | |
b09a5592 PP |
797 | const bt_message *msg; |
798 | struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter = | |
a71ed05c PP |
799 | g_ptr_array_index( |
800 | muxer_msg_iter->active_muxer_upstream_msg_iters, | |
801 | i); | |
b09a5592 PP |
802 | int64_t msg_ts_ns; |
803 | ||
804 | if (!cur_muxer_upstream_msg_iter->msg_iter) { | |
805 | /* This upstream message iterator is ended */ | |
c9ecaa78 | 806 | BT_COMP_LOGT("Skipping ended upstream message iterator: " |
b09a5592 PP |
807 | "muxer-upstream-msg-iter-wrap-addr=%p", |
808 | cur_muxer_upstream_msg_iter); | |
958f7d11 PP |
809 | continue; |
810 | } | |
811 | ||
b09a5592 PP |
812 | BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0); |
813 | msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); | |
814 | BT_ASSERT(msg); | |
37911c11 | 815 | |
85e7137b | 816 | if (G_UNLIKELY(bt_message_get_type(msg) == |
37911c11 PP |
817 | BT_MESSAGE_TYPE_STREAM_BEGINNING)) { |
818 | ret = validate_new_stream_clock_class( | |
819 | muxer_msg_iter, muxer_comp, | |
820 | bt_message_stream_beginning_borrow_stream_const( | |
821 | msg)); | |
822 | if (ret) { | |
823 | /* | |
824 | * validate_new_stream_clock_class() logs | |
825 | * errors. | |
826 | */ | |
fb25b9e3 | 827 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
37911c11 PP |
828 | goto end; |
829 | } | |
85e7137b | 830 | } else if (G_UNLIKELY(bt_message_get_type(msg) == |
37911c11 PP |
831 | BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) { |
832 | const bt_clock_snapshot *cs; | |
37911c11 | 833 | |
11ddb3ef PP |
834 | cs = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( |
835 | msg); | |
37911c11 PP |
836 | ret = validate_clock_class(muxer_msg_iter, muxer_comp, |
837 | bt_clock_snapshot_borrow_clock_class_const(cs)); | |
838 | if (ret) { | |
839 | /* validate_clock_class() logs errors */ | |
fb25b9e3 | 840 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
37911c11 PP |
841 | goto end; |
842 | } | |
843 | } | |
844 | ||
b09a5592 PP |
845 | ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg, |
846 | muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns); | |
958f7d11 | 847 | if (ret) { |
b09a5592 PP |
848 | /* get_msg_ts_ns() logs errors */ |
849 | *muxer_upstream_msg_iter = NULL; | |
fb25b9e3 | 850 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
958f7d11 PP |
851 | goto end; |
852 | } | |
853 | ||
759bd1bf FD |
854 | /* |
855 | * Update the current message iterator if it has not been set | |
856 | * yet, or if its current message has a timestamp smaller than | |
857 | * the previously selected youngest message. | |
858 | */ | |
859 | if (G_UNLIKELY(*muxer_upstream_msg_iter == NULL) || | |
860 | msg_ts_ns < youngest_ts_ns) { | |
b09a5592 PP |
861 | *muxer_upstream_msg_iter = |
862 | cur_muxer_upstream_msg_iter; | |
863 | youngest_ts_ns = msg_ts_ns; | |
958f7d11 | 864 | *ts_ns = youngest_ts_ns; |
c3923e1f FD |
865 | } else if (msg_ts_ns == youngest_ts_ns) { |
866 | /* | |
867 | * The currently selected message to be sent downstream | |
868 | * next has the exact same timestamp that of the | |
869 | * current candidate message. We must break the tie | |
870 | * in a predictable manner. | |
871 | */ | |
872 | const bt_message *selected_msg = g_queue_peek_head( | |
873 | (*muxer_upstream_msg_iter)->msgs); | |
874 | BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); | |
875 | ||
876 | /* | |
877 | * Order the messages in an arbitrary but determinitic | |
878 | * way. | |
879 | */ | |
9240924b | 880 | ret = common_muxing_compare_messages(msg, selected_msg); |
c3923e1f FD |
881 | if (ret < 0) { |
882 | /* | |
883 | * The `msg` should go first. Update the next | |
884 | * iterator and the current timestamp. | |
885 | */ | |
886 | *muxer_upstream_msg_iter = | |
887 | cur_muxer_upstream_msg_iter; | |
888 | youngest_ts_ns = msg_ts_ns; | |
889 | *ts_ns = youngest_ts_ns; | |
890 | } else if (ret == 0) { | |
891 | /* Unable to pick which one should go first. */ | |
892 | BT_COMP_LOGW("Cannot deterministically pick next upstream message iterator because they have identical next messages: " | |
893 | "muxer-upstream-msg-iter-wrap-addr=%p" | |
894 | "cur-muxer-upstream-msg-iter-wrap-addr=%p", | |
895 | *muxer_upstream_msg_iter, | |
896 | cur_muxer_upstream_msg_iter); | |
897 | } | |
958f7d11 PP |
898 | } |
899 | } | |
900 | ||
b09a5592 | 901 | if (!*muxer_upstream_msg_iter) { |
fb25b9e3 | 902 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; |
958f7d11 PP |
903 | *ts_ns = INT64_MIN; |
904 | } | |
905 | ||
906 | end: | |
907 | return status; | |
908 | } | |
909 | ||
910 | static | |
fb25b9e3 PP |
911 | bt_component_class_message_iterator_next_method_status |
912 | validate_muxer_upstream_msg_iter( | |
a71ed05c PP |
913 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter, |
914 | bool *is_ended) | |
958f7d11 | 915 | { |
f5abbab4 PP |
916 | struct muxer_comp *muxer_comp = |
917 | muxer_upstream_msg_iter->muxer_comp; | |
fb25b9e3 PP |
918 | bt_component_class_message_iterator_next_method_status status = |
919 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; | |
958f7d11 | 920 | |
d9693c64 | 921 | BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: " |
b09a5592 PP |
922 | "muxer-upstream-msg-iter-wrap-addr=%p", |
923 | muxer_upstream_msg_iter); | |
318fe670 | 924 | |
b09a5592 PP |
925 | if (muxer_upstream_msg_iter->msgs->length > 0 || |
926 | !muxer_upstream_msg_iter->msg_iter) { | |
d9693c64 | 927 | BT_COMP_LOGD("Already valid or not considered: " |
b09a5592 PP |
928 | "queue-len=%u, upstream-msg-iter-addr=%p", |
929 | muxer_upstream_msg_iter->msgs->length, | |
930 | muxer_upstream_msg_iter->msg_iter); | |
ab11110e PP |
931 | goto end; |
932 | } | |
933 | ||
b09a5592 | 934 | /* muxer_upstream_msg_iter_next() logs details/errors */ |
a71ed05c PP |
935 | status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter, |
936 | is_ended); | |
089717de PP |
937 | |
938 | end: | |
939 | return status; | |
940 | } | |
941 | ||
942 | static | |
fb25b9e3 PP |
943 | bt_component_class_message_iterator_next_method_status |
944 | validate_muxer_upstream_msg_iters( | |
b09a5592 | 945 | struct muxer_msg_iter *muxer_msg_iter) |
089717de | 946 | { |
f5abbab4 | 947 | struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp; |
fb25b9e3 PP |
948 | bt_component_class_message_iterator_next_method_status status = |
949 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; | |
089717de PP |
950 | size_t i; |
951 | ||
d9693c64 | 952 | BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: " |
b09a5592 | 953 | "muxer-msg-iter-addr=%p", muxer_msg_iter); |
318fe670 | 954 | |
a71ed05c PP |
955 | for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; |
956 | i++) { | |
90aed4f2 | 957 | bool is_ended = false; |
b09a5592 | 958 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = |
089717de | 959 | g_ptr_array_index( |
a71ed05c | 960 | muxer_msg_iter->active_muxer_upstream_msg_iters, |
089717de PP |
961 | i); |
962 | ||
b09a5592 | 963 | status = validate_muxer_upstream_msg_iter( |
a71ed05c | 964 | muxer_upstream_msg_iter, &is_ended); |
fb25b9e3 | 965 | if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { |
318fe670 | 966 | if (status < 0) { |
d9693c64 | 967 | BT_COMP_LOGE("Cannot validate muxer's upstream message iterator wrapper: " |
b09a5592 PP |
968 | "muxer-msg-iter-addr=%p, " |
969 | "muxer-upstream-msg-iter-wrap-addr=%p", | |
970 | muxer_msg_iter, | |
971 | muxer_upstream_msg_iter); | |
318fe670 | 972 | } else { |
d9693c64 | 973 | BT_COMP_LOGD("Cannot validate muxer's upstream message iterator wrapper: " |
b09a5592 PP |
974 | "muxer-msg-iter-addr=%p, " |
975 | "muxer-upstream-msg-iter-wrap-addr=%p", | |
976 | muxer_msg_iter, | |
977 | muxer_upstream_msg_iter); | |
318fe670 PP |
978 | } |
979 | ||
089717de PP |
980 | goto end; |
981 | } | |
744ba28b PP |
982 | |
983 | /* | |
a71ed05c PP |
984 | * Move this muxer upstream message iterator to the |
985 | * array of ended iterators if it's ended. | |
744ba28b | 986 | */ |
85e7137b | 987 | if (G_UNLIKELY(is_ended)) { |
d9693c64 | 988 | BT_COMP_LOGD("Muxer's upstream message iterator wrapper: ended or canceled: " |
a71ed05c PP |
989 | "muxer-msg-iter-addr=%p, " |
990 | "muxer-upstream-msg-iter-wrap-addr=%p", | |
991 | muxer_msg_iter, muxer_upstream_msg_iter); | |
992 | g_ptr_array_add( | |
993 | muxer_msg_iter->ended_muxer_upstream_msg_iters, | |
994 | muxer_upstream_msg_iter); | |
995 | muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL; | |
996 | ||
744ba28b PP |
997 | /* |
998 | * Use g_ptr_array_remove_fast() because the | |
999 | * order of those elements is not important. | |
1000 | */ | |
1001 | g_ptr_array_remove_index_fast( | |
a71ed05c | 1002 | muxer_msg_iter->active_muxer_upstream_msg_iters, |
744ba28b PP |
1003 | i); |
1004 | i--; | |
1005 | } | |
089717de PP |
1006 | } |
1007 | ||
1008 | end: | |
1009 | return status; | |
1010 | } | |
1011 | ||
3fd7b79d | 1012 | static inline |
fb25b9e3 | 1013 | bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next_one( |
089717de | 1014 | struct muxer_comp *muxer_comp, |
b09a5592 PP |
1015 | struct muxer_msg_iter *muxer_msg_iter, |
1016 | const bt_message **msg) | |
089717de | 1017 | { |
fb25b9e3 | 1018 | bt_component_class_message_iterator_next_method_status status; |
b09a5592 | 1019 | struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL; |
089717de PP |
1020 | int64_t next_return_ts; |
1021 | ||
1043fdea | 1022 | status = validate_muxer_upstream_msg_iters(muxer_msg_iter); |
fb25b9e3 | 1023 | if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { |
1043fdea PP |
1024 | /* validate_muxer_upstream_msg_iters() logs details */ |
1025 | goto end; | |
958f7d11 PP |
1026 | } |
1027 | ||
1028 | /* | |
089717de | 1029 | * At this point we know that all the existing upstream |
b09a5592 PP |
1030 | * message iterators are valid. We can find the one, |
1031 | * amongst those, of which the current message is the | |
089717de | 1032 | * youngest. |
958f7d11 | 1033 | */ |
b09a5592 PP |
1034 | status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, |
1035 | muxer_msg_iter, &muxer_upstream_msg_iter, | |
089717de | 1036 | &next_return_ts); |
fb25b9e3 | 1037 | if (status < 0 || status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) { |
3fd7b79d | 1038 | if (status < 0) { |
d9693c64 | 1039 | BT_COMP_LOGE("Cannot find the youngest upstream message iterator wrapper: " |
318fe670 | 1040 | "status=%s", |
fb25b9e3 | 1041 | bt_common_func_status_string(status)); |
318fe670 | 1042 | } else { |
d9693c64 | 1043 | BT_COMP_LOGD("Cannot find the youngest upstream message iterator wrapper: " |
318fe670 | 1044 | "status=%s", |
fb25b9e3 | 1045 | bt_common_func_status_string(status)); |
318fe670 PP |
1046 | } |
1047 | ||
958f7d11 PP |
1048 | goto end; |
1049 | } | |
1050 | ||
b09a5592 | 1051 | if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) { |
d9693c64 | 1052 | BT_COMP_LOGE("Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: " |
b09a5592 | 1053 | "muxer-msg-iter-addr=%p, ts=%" PRId64 ", " |
318fe670 | 1054 | "last-returned-ts=%" PRId64, |
b09a5592 PP |
1055 | muxer_msg_iter, next_return_ts, |
1056 | muxer_msg_iter->last_returned_ts_ns); | |
fb25b9e3 | 1057 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; |
958f7d11 PP |
1058 | goto end; |
1059 | } | |
1060 | ||
d9693c64 | 1061 | BT_COMP_LOGD("Found youngest upstream message iterator wrapper: " |
b09a5592 PP |
1062 | "muxer-msg-iter-addr=%p, " |
1063 | "muxer-upstream-msg-iter-wrap-addr=%p, " | |
318fe670 | 1064 | "ts=%" PRId64, |
b09a5592 | 1065 | muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); |
fb25b9e3 | 1066 | BT_ASSERT(status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK); |
b09a5592 | 1067 | BT_ASSERT(muxer_upstream_msg_iter); |
958f7d11 PP |
1068 | |
1069 | /* | |
3fd7b79d | 1070 | * Consume from the queue's head: other side |
b09a5592 | 1071 | * (muxer_upstream_msg_iter_next()) writes to the tail. |
958f7d11 | 1072 | */ |
b09a5592 PP |
1073 | *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs); |
1074 | BT_ASSERT(*msg); | |
1075 | muxer_msg_iter->last_returned_ts_ns = next_return_ts; | |
958f7d11 PP |
1076 | |
1077 | end: | |
3fd7b79d PP |
1078 | return status; |
1079 | } | |
1080 | ||
1081 | static | |
fb25b9e3 | 1082 | bt_component_class_message_iterator_next_method_status muxer_msg_iter_do_next( |
3fd7b79d | 1083 | struct muxer_comp *muxer_comp, |
b09a5592 PP |
1084 | struct muxer_msg_iter *muxer_msg_iter, |
1085 | bt_message_array_const msgs, uint64_t capacity, | |
3fd7b79d PP |
1086 | uint64_t *count) |
1087 | { | |
fb25b9e3 PP |
1088 | bt_component_class_message_iterator_next_method_status status = |
1089 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; | |
3fd7b79d PP |
1090 | uint64_t i = 0; |
1091 | ||
fb25b9e3 | 1092 | while (i < capacity && status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { |
b09a5592 PP |
1093 | status = muxer_msg_iter_do_next_one(muxer_comp, |
1094 | muxer_msg_iter, &msgs[i]); | |
fb25b9e3 | 1095 | if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) { |
3fd7b79d PP |
1096 | i++; |
1097 | } | |
1098 | } | |
1099 | ||
1100 | if (i > 0) { | |
1101 | /* | |
b09a5592 | 1102 | * Even if muxer_msg_iter_do_next_one() returned |
3fd7b79d | 1103 | * something else than |
b09a5592 PP |
1104 | * BT_MESSAGE_ITERATOR_STATUS_OK, we accumulated |
1105 | * message objects in the output message | |
3fd7b79d | 1106 | * array, so we need to return |
b09a5592 | 1107 | * BT_MESSAGE_ITERATOR_STATUS_OK so that they are |
3fd7b79d | 1108 | * transfered to downstream. This other status occurs |
b09a5592 | 1109 | * again the next time muxer_msg_iter_do_next() is |
3fd7b79d | 1110 | * called, possibly without any accumulated |
b09a5592 | 1111 | * message, in which case we'll return it. |
3fd7b79d PP |
1112 | */ |
1113 | *count = i; | |
fb25b9e3 | 1114 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; |
3fd7b79d PP |
1115 | } |
1116 | ||
1117 | return status; | |
958f7d11 PP |
1118 | } |
1119 | ||
1120 | static | |
b09a5592 | 1121 | void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) |
ab11110e | 1122 | { |
f5abbab4 PP |
1123 | struct muxer_comp *muxer_comp; |
1124 | ||
b09a5592 | 1125 | if (!muxer_msg_iter) { |
ab11110e PP |
1126 | return; |
1127 | } | |
1128 | ||
f5abbab4 | 1129 | muxer_comp = muxer_msg_iter->muxer_comp; |
d9693c64 | 1130 | BT_COMP_LOGD("Destroying muxer component's message iterator: " |
b09a5592 | 1131 | "muxer-msg-iter-addr=%p", muxer_msg_iter); |
318fe670 | 1132 | |
a71ed05c | 1133 | if (muxer_msg_iter->active_muxer_upstream_msg_iters) { |
d9693c64 | 1134 | BT_COMP_LOGD_STR("Destroying muxer's active upstream message iterator wrappers."); |
a71ed05c PP |
1135 | g_ptr_array_free( |
1136 | muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE); | |
1137 | } | |
1138 | ||
1139 | if (muxer_msg_iter->ended_muxer_upstream_msg_iters) { | |
d9693c64 | 1140 | BT_COMP_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers."); |
ab11110e | 1141 | g_ptr_array_free( |
a71ed05c | 1142 | muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE); |
ab11110e PP |
1143 | } |
1144 | ||
b09a5592 | 1145 | g_free(muxer_msg_iter); |
ab11110e PP |
1146 | } |
1147 | ||
1148 | static | |
4175c1d5 | 1149 | bt_component_class_message_iterator_initialize_method_status |
ab8b2b1b | 1150 | muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, |
c49bf79b SM |
1151 | struct muxer_msg_iter *muxer_msg_iter, |
1152 | struct bt_self_message_iterator_configuration *config) | |
958f7d11 | 1153 | { |
544d0515 PP |
1154 | int64_t count; |
1155 | int64_t i; | |
4175c1d5 | 1156 | bt_component_class_message_iterator_initialize_method_status status; |
c49bf79b | 1157 | bool can_seek_forward = true; |
958f7d11 | 1158 | |
834e9996 | 1159 | count = bt_component_filter_get_input_port_count( |
bb61965b | 1160 | bt_self_component_filter_as_component_filter( |
d9693c64 | 1161 | muxer_comp->self_comp_flt)); |
544d0515 | 1162 | if (count < 0) { |
d9693c64 | 1163 | BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: " |
b09a5592 PP |
1164 | "muxer-comp-addr=%p, muxer-msg-iter-addr=%p", |
1165 | muxer_comp, muxer_msg_iter); | |
4175c1d5 | 1166 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; |
ab11110e PP |
1167 | goto end; |
1168 | } | |
958f7d11 PP |
1169 | |
1170 | for (i = 0; i < count; i++) { | |
1043fdea | 1171 | bt_self_component_port_input_message_iterator *upstream_msg_iter; |
8eee8ea2 | 1172 | bt_self_component_port_input *self_port = |
834e9996 | 1173 | bt_self_component_filter_borrow_input_port_by_index( |
d9693c64 | 1174 | muxer_comp->self_comp_flt, i); |
8eee8ea2 | 1175 | const bt_port *port; |
ab8b2b1b SM |
1176 | bt_self_component_port_input_message_iterator_create_from_message_iterator_status |
1177 | msg_iter_status; | |
1178 | int int_status; | |
958f7d11 | 1179 | |
834e9996 | 1180 | BT_ASSERT(self_port); |
bb61965b PP |
1181 | port = bt_self_component_port_as_port( |
1182 | bt_self_component_port_input_as_self_component_port( | |
834e9996 | 1183 | self_port)); |
8b45963b | 1184 | BT_ASSERT(port); |
958f7d11 PP |
1185 | |
1186 | if (!bt_port_is_connected(port)) { | |
1043fdea | 1187 | /* Skip non-connected port */ |
958f7d11 PP |
1188 | continue; |
1189 | } | |
1190 | ||
ab8b2b1b SM |
1191 | msg_iter_status = create_msg_iter_on_input_port(muxer_comp, |
1192 | muxer_msg_iter, self_port, &upstream_msg_iter); | |
1193 | if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { | |
1043fdea | 1194 | /* create_msg_iter_on_input_port() logs errors */ |
ab8b2b1b | 1195 | status = (int) msg_iter_status; |
ab11110e | 1196 | goto end; |
958f7d11 | 1197 | } |
318fe670 | 1198 | |
ab8b2b1b | 1199 | int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, |
ae644e59 | 1200 | upstream_msg_iter); |
1043fdea PP |
1201 | bt_self_component_port_input_message_iterator_put_ref( |
1202 | upstream_msg_iter); | |
ab8b2b1b | 1203 | if (int_status) { |
4175c1d5 | 1204 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; |
1043fdea | 1205 | /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ |
1043fdea PP |
1206 | goto end; |
1207 | } | |
c49bf79b SM |
1208 | |
1209 | can_seek_forward = can_seek_forward && | |
1210 | bt_self_component_port_input_message_iterator_can_seek_forward( | |
1211 | upstream_msg_iter); | |
958f7d11 PP |
1212 | } |
1213 | ||
c49bf79b SM |
1214 | /* |
1215 | * This iterator can seek forward if all of its iterators can seek | |
1216 | * forward. | |
1217 | */ | |
1218 | bt_self_message_iterator_configuration_set_can_seek_forward( | |
1219 | config, can_seek_forward); | |
1220 | ||
4175c1d5 | 1221 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; |
ab8b2b1b | 1222 | |
958f7d11 | 1223 | end: |
ab8b2b1b | 1224 | return status; |
958f7d11 PP |
1225 | } |
1226 | ||
958f7d11 | 1227 | BT_HIDDEN |
4175c1d5 | 1228 | bt_component_class_message_iterator_initialize_method_status muxer_msg_iter_init( |
b09a5592 | 1229 | bt_self_message_iterator *self_msg_iter, |
9415de1c | 1230 | bt_self_message_iterator_configuration *config, |
8eee8ea2 PP |
1231 | bt_self_component_filter *self_comp, |
1232 | bt_self_component_port_output *port) | |
958f7d11 PP |
1233 | { |
1234 | struct muxer_comp *muxer_comp = NULL; | |
b09a5592 | 1235 | struct muxer_msg_iter *muxer_msg_iter = NULL; |
4175c1d5 | 1236 | bt_component_class_message_iterator_initialize_method_status status; |
958f7d11 | 1237 | |
834e9996 | 1238 | muxer_comp = bt_self_component_get_data( |
bb61965b | 1239 | bt_self_component_filter_as_self_component(self_comp)); |
8b45963b | 1240 | BT_ASSERT(muxer_comp); |
d9693c64 | 1241 | BT_COMP_LOGD("Initializing muxer component's message iterator: " |
b09a5592 PP |
1242 | "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", |
1243 | self_comp, muxer_comp, self_msg_iter); | |
a09c6b95 | 1244 | |
b09a5592 | 1245 | if (muxer_comp->initializing_muxer_msg_iter) { |
a09c6b95 | 1246 | /* |
089717de | 1247 | * Weird, unhandled situation detected: downstream |
b09a5592 PP |
1248 | * creates a muxer message iterator while creating |
1249 | * another muxer message iterator (same component). | |
a09c6b95 | 1250 | */ |
d9693c64 | 1251 | BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: " |
b09a5592 PP |
1252 | "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", |
1253 | self_comp, muxer_comp, self_msg_iter); | |
4175c1d5 | 1254 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; |
958f7d11 PP |
1255 | goto error; |
1256 | } | |
1257 | ||
b09a5592 PP |
1258 | muxer_comp->initializing_muxer_msg_iter = true; |
1259 | muxer_msg_iter = g_new0(struct muxer_msg_iter, 1); | |
1260 | if (!muxer_msg_iter) { | |
d9693c64 | 1261 | BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator."); |
4175c1d5 | 1262 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; |
ab11110e PP |
1263 | goto error; |
1264 | } | |
1265 | ||
f5abbab4 | 1266 | muxer_msg_iter->muxer_comp = muxer_comp; |
692f1a01 | 1267 | muxer_msg_iter->self_msg_iter = self_msg_iter; |
b09a5592 | 1268 | muxer_msg_iter->last_returned_ts_ns = INT64_MIN; |
a71ed05c | 1269 | muxer_msg_iter->active_muxer_upstream_msg_iters = |
958f7d11 | 1270 | g_ptr_array_new_with_free_func( |
b09a5592 | 1271 | (GDestroyNotify) destroy_muxer_upstream_msg_iter); |
a71ed05c | 1272 | if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { |
d9693c64 | 1273 | BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); |
4175c1d5 | 1274 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; |
a71ed05c PP |
1275 | goto error; |
1276 | } | |
1277 | ||
1278 | muxer_msg_iter->ended_muxer_upstream_msg_iters = | |
1279 | g_ptr_array_new_with_free_func( | |
1280 | (GDestroyNotify) destroy_muxer_upstream_msg_iter); | |
1281 | if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { | |
d9693c64 | 1282 | BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); |
4175c1d5 | 1283 | status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; |
958f7d11 PP |
1284 | goto error; |
1285 | } | |
1286 | ||
ab8b2b1b | 1287 | status = muxer_msg_iter_init_upstream_iterators(muxer_comp, |
c49bf79b | 1288 | muxer_msg_iter, config); |
ab8b2b1b | 1289 | if (status) { |
d9693c64 | 1290 | BT_COMP_LOGE("Cannot initialize connected input ports for muxer component's message iterator: " |
318fe670 | 1291 | "comp-addr=%p, muxer-comp-addr=%p, " |
b09a5592 PP |
1292 | "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d", |
1293 | self_comp, muxer_comp, muxer_msg_iter, | |
ab8b2b1b | 1294 | self_msg_iter, status); |
a09c6b95 PP |
1295 | goto error; |
1296 | } | |
1297 | ||
1043fdea | 1298 | bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter); |
d9693c64 | 1299 | BT_COMP_LOGD("Initialized muxer component's message iterator: " |
b09a5592 PP |
1300 | "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " |
1301 | "msg-iter-addr=%p", | |
1302 | self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); | |
958f7d11 PP |
1303 | goto end; |
1304 | ||
1305 | error: | |
b09a5592 | 1306 | destroy_muxer_msg_iter(muxer_msg_iter); |
1043fdea | 1307 | bt_self_message_iterator_set_data(self_msg_iter, NULL); |
958f7d11 PP |
1308 | |
1309 | end: | |
b09a5592 | 1310 | muxer_comp->initializing_muxer_msg_iter = false; |
958f7d11 PP |
1311 | return status; |
1312 | } | |
1313 | ||
1314 | BT_HIDDEN | |
a71ed05c | 1315 | void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) |
958f7d11 | 1316 | { |
b09a5592 PP |
1317 | struct muxer_msg_iter *muxer_msg_iter = |
1318 | bt_self_message_iterator_get_data(self_msg_iter); | |
8eee8ea2 | 1319 | bt_self_component *self_comp = NULL; |
958f7d11 PP |
1320 | struct muxer_comp *muxer_comp = NULL; |
1321 | ||
b09a5592 PP |
1322 | self_comp = bt_self_message_iterator_borrow_component( |
1323 | self_msg_iter); | |
834e9996 PP |
1324 | BT_ASSERT(self_comp); |
1325 | muxer_comp = bt_self_component_get_data(self_comp); | |
d9693c64 | 1326 | BT_COMP_LOGD("Finalizing muxer component's message iterator: " |
b09a5592 PP |
1327 | "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " |
1328 | "msg-iter-addr=%p", | |
1329 | self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); | |
958f7d11 | 1330 | |
1043fdea | 1331 | if (muxer_msg_iter) { |
b09a5592 | 1332 | destroy_muxer_msg_iter(muxer_msg_iter); |
958f7d11 | 1333 | } |
958f7d11 PP |
1334 | } |
1335 | ||
1336 | BT_HIDDEN | |
fb25b9e3 | 1337 | bt_component_class_message_iterator_next_method_status muxer_msg_iter_next( |
b09a5592 PP |
1338 | bt_self_message_iterator *self_msg_iter, |
1339 | bt_message_array_const msgs, uint64_t capacity, | |
3fd7b79d | 1340 | uint64_t *count) |
958f7d11 | 1341 | { |
fb25b9e3 | 1342 | bt_component_class_message_iterator_next_method_status status; |
b09a5592 PP |
1343 | struct muxer_msg_iter *muxer_msg_iter = |
1344 | bt_self_message_iterator_get_data(self_msg_iter); | |
8eee8ea2 | 1345 | bt_self_component *self_comp = NULL; |
958f7d11 | 1346 | struct muxer_comp *muxer_comp = NULL; |
958f7d11 | 1347 | |
b09a5592 PP |
1348 | BT_ASSERT(muxer_msg_iter); |
1349 | self_comp = bt_self_message_iterator_borrow_component( | |
1350 | self_msg_iter); | |
834e9996 PP |
1351 | BT_ASSERT(self_comp); |
1352 | muxer_comp = bt_self_component_get_data(self_comp); | |
8b45963b | 1353 | BT_ASSERT(muxer_comp); |
c9ecaa78 | 1354 | BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: " |
b09a5592 PP |
1355 | "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " |
1356 | "msg-iter-addr=%p", | |
1357 | self_comp, muxer_comp, muxer_msg_iter, self_msg_iter); | |
318fe670 | 1358 | |
b09a5592 PP |
1359 | status = muxer_msg_iter_do_next(muxer_comp, muxer_msg_iter, |
1360 | msgs, capacity, count); | |
3fd7b79d | 1361 | if (status < 0) { |
d9693c64 | 1362 | BT_COMP_LOGE("Cannot get next message: " |
b09a5592 PP |
1363 | "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " |
1364 | "msg-iter-addr=%p, status=%s", | |
1365 | self_comp, muxer_comp, muxer_msg_iter, self_msg_iter, | |
fb25b9e3 | 1366 | bt_common_func_status_string(status)); |
318fe670 | 1367 | } else { |
c9ecaa78 | 1368 | BT_COMP_LOGT("Returning from muxer component's message iterator's \"next\" method: " |
3fd7b79d | 1369 | "status=%s", |
fb25b9e3 | 1370 | bt_common_func_status_string(status)); |
318fe670 | 1371 | } |
958f7d11 | 1372 | |
3fd7b79d | 1373 | return status; |
958f7d11 PP |
1374 | } |
1375 | ||
1376 | BT_HIDDEN | |
fb25b9e3 | 1377 | bt_component_class_port_connected_method_status muxer_input_port_connected( |
8eee8ea2 PP |
1378 | bt_self_component_filter *self_comp, |
1379 | bt_self_component_port_input *self_port, | |
1380 | const bt_port_output *other_port) | |
958f7d11 | 1381 | { |
fb25b9e3 PP |
1382 | bt_component_class_port_connected_method_status status = |
1383 | BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_OK; | |
1384 | bt_self_component_add_port_status add_port_status; | |
f5abbab4 PP |
1385 | struct muxer_comp *muxer_comp = bt_self_component_get_data( |
1386 | bt_self_component_filter_as_self_component(self_comp)); | |
958f7d11 | 1387 | |
fb25b9e3 PP |
1388 | add_port_status = add_available_input_port(self_comp); |
1389 | if (add_port_status) { | |
d9693c64 | 1390 | BT_COMP_LOGE("Cannot add one muxer component's input port: " |
1043fdea | 1391 | "status=%s", |
fb25b9e3 PP |
1392 | bt_common_func_status_string(status)); |
1393 | ||
1394 | if (add_port_status == | |
1395 | BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) { | |
1396 | status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_MEMORY_ERROR; | |
1397 | } else { | |
1398 | status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_ERROR; | |
1399 | } | |
1400 | ||
06a2cb0d PP |
1401 | goto end; |
1402 | } | |
1403 | ||
958f7d11 | 1404 | end: |
634f394c | 1405 | return status; |
958f7d11 | 1406 | } |
c9d3ff42 | 1407 | |
a71ed05c | 1408 | static inline |
9e8e8b43 SM |
1409 | bt_component_class_message_iterator_can_seek_beginning_method_status |
1410 | muxer_upstream_msg_iters_can_all_seek_beginning( | |
1411 | GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek) | |
c9d3ff42 | 1412 | { |
2f355c38 PP |
1413 | bt_component_class_message_iterator_can_seek_beginning_method_status status = |
1414 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK; | |
c9d3ff42 | 1415 | uint64_t i; |
c9d3ff42 | 1416 | |
a71ed05c | 1417 | for (i = 0; i < muxer_upstream_msg_iters->len; i++) { |
c9d3ff42 | 1418 | struct muxer_upstream_msg_iter *upstream_msg_iter = |
a71ed05c | 1419 | muxer_upstream_msg_iters->pdata[i]; |
9e8e8b43 SM |
1420 | status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning( |
1421 | upstream_msg_iter->msg_iter, can_seek); | |
1422 | if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { | |
1423 | goto end; | |
1424 | } | |
c9d3ff42 | 1425 | |
9e8e8b43 | 1426 | if (!*can_seek) { |
c9d3ff42 PP |
1427 | goto end; |
1428 | } | |
1429 | } | |
1430 | ||
9e8e8b43 SM |
1431 | *can_seek = BT_TRUE; |
1432 | ||
c9d3ff42 | 1433 | end: |
9e8e8b43 | 1434 | return status; |
c9d3ff42 PP |
1435 | } |
1436 | ||
a71ed05c | 1437 | BT_HIDDEN |
9e8e8b43 SM |
1438 | bt_component_class_message_iterator_can_seek_beginning_method_status |
1439 | muxer_msg_iter_can_seek_beginning( | |
1440 | bt_self_message_iterator *self_msg_iter, bt_bool *can_seek) | |
a71ed05c PP |
1441 | { |
1442 | struct muxer_msg_iter *muxer_msg_iter = | |
1443 | bt_self_message_iterator_get_data(self_msg_iter); | |
9e8e8b43 | 1444 | bt_component_class_message_iterator_can_seek_beginning_method_status status; |
a71ed05c | 1445 | |
9e8e8b43 SM |
1446 | status = muxer_upstream_msg_iters_can_all_seek_beginning( |
1447 | muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek); | |
1448 | if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { | |
a71ed05c PP |
1449 | goto end; |
1450 | } | |
1451 | ||
9e8e8b43 | 1452 | if (!*can_seek) { |
a71ed05c PP |
1453 | goto end; |
1454 | } | |
1455 | ||
9e8e8b43 SM |
1456 | status = muxer_upstream_msg_iters_can_all_seek_beginning( |
1457 | muxer_msg_iter->ended_muxer_upstream_msg_iters, can_seek); | |
1458 | ||
a71ed05c | 1459 | end: |
9e8e8b43 | 1460 | return status; |
a71ed05c PP |
1461 | } |
1462 | ||
c9d3ff42 | 1463 | BT_HIDDEN |
fb25b9e3 | 1464 | bt_component_class_message_iterator_seek_beginning_method_status muxer_msg_iter_seek_beginning( |
c9d3ff42 PP |
1465 | bt_self_message_iterator *self_msg_iter) |
1466 | { | |
1467 | struct muxer_msg_iter *muxer_msg_iter = | |
1468 | bt_self_message_iterator_get_data(self_msg_iter); | |
fb25b9e3 PP |
1469 | bt_component_class_message_iterator_seek_beginning_method_status status = |
1470 | BT_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_STATUS_OK; | |
1471 | bt_message_iterator_seek_beginning_status seek_beg_status; | |
c9d3ff42 PP |
1472 | uint64_t i; |
1473 | ||
a71ed05c PP |
1474 | /* Seek all ended upstream iterators first */ |
1475 | for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len; | |
1476 | i++) { | |
c9d3ff42 | 1477 | struct muxer_upstream_msg_iter *upstream_msg_iter = |
a71ed05c | 1478 | muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; |
c9d3ff42 | 1479 | |
fb25b9e3 | 1480 | seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( |
c9d3ff42 | 1481 | upstream_msg_iter->msg_iter); |
fb25b9e3 PP |
1482 | if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { |
1483 | status = (int) seek_beg_status; | |
c9d3ff42 PP |
1484 | goto end; |
1485 | } | |
a71ed05c PP |
1486 | |
1487 | empty_message_queue(upstream_msg_iter); | |
1488 | } | |
1489 | ||
1490 | /* Seek all previously active upstream iterators */ | |
1491 | for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len; | |
1492 | i++) { | |
1493 | struct muxer_upstream_msg_iter *upstream_msg_iter = | |
1494 | muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i]; | |
1495 | ||
fb25b9e3 | 1496 | seek_beg_status = bt_self_component_port_input_message_iterator_seek_beginning( |
a71ed05c | 1497 | upstream_msg_iter->msg_iter); |
fb25b9e3 PP |
1498 | if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) { |
1499 | status = (int) seek_beg_status; | |
a71ed05c PP |
1500 | goto end; |
1501 | } | |
1502 | ||
1503 | empty_message_queue(upstream_msg_iter); | |
1504 | } | |
1505 | ||
1506 | /* Make them all active */ | |
1507 | for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len; | |
1508 | i++) { | |
1509 | struct muxer_upstream_msg_iter *upstream_msg_iter = | |
1510 | muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i]; | |
1511 | ||
1512 | g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, | |
1513 | upstream_msg_iter); | |
1514 | muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL; | |
c9d3ff42 PP |
1515 | } |
1516 | ||
0e4ccfb9 MJ |
1517 | /* |
1518 | * GLib < 2.48.0 asserts when g_ptr_array_remove_range() is | |
1519 | * called on an empty array. | |
1520 | */ | |
1521 | if (muxer_msg_iter->ended_muxer_upstream_msg_iters->len > 0) { | |
1522 | g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters, | |
1523 | 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len); | |
1524 | } | |
c9d3ff42 | 1525 | muxer_msg_iter->last_returned_ts_ns = INT64_MIN; |
a71ed05c PP |
1526 | muxer_msg_iter->clock_class_expectation = |
1527 | MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY; | |
c9d3ff42 PP |
1528 | |
1529 | end: | |
fb25b9e3 | 1530 | return status; |
c9d3ff42 | 1531 | } |