Replace assert() -> BT_ASSERT() and some preconditions with BT_ASSERT_PRE()
[babeltrace.git] / lib / graph / iterator.c
... / ...
CommitLineData
1/*
2 * iterator.c
3 *
4 * Babeltrace Notification Iterator
5 *
6 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining a copy
10 * of this software and associated documentation files (the "Software"), to deal
11 * in the Software without restriction, including without limitation the rights
12 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 * copies of the Software, and to permit persons to whom the Software is
14 * furnished to do so, subject to the following conditions:
15 *
16 * The above copyright notice and this permission notice shall be included in
17 * all copies or substantial portions of the Software.
18 *
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 * SOFTWARE.
26 */
27
28#define BT_LOG_TAG "NOTIF-ITER"
29#include <babeltrace/lib-logging-internal.h>
30
31#include <babeltrace/compiler-internal.h>
32#include <babeltrace/ref.h>
33#include <babeltrace/ctf-ir/fields.h>
34#include <babeltrace/ctf-ir/field-types.h>
35#include <babeltrace/ctf-ir/field-types-internal.h>
36#include <babeltrace/ctf-ir/event-internal.h>
37#include <babeltrace/ctf-ir/packet-internal.h>
38#include <babeltrace/ctf-ir/stream-internal.h>
39#include <babeltrace/graph/connection.h>
40#include <babeltrace/graph/connection-internal.h>
41#include <babeltrace/graph/component.h>
42#include <babeltrace/graph/component-source-internal.h>
43#include <babeltrace/graph/component-class-internal.h>
44#include <babeltrace/graph/component-class-sink-colander-internal.h>
45#include <babeltrace/graph/component-sink.h>
46#include <babeltrace/graph/notification.h>
47#include <babeltrace/graph/notification-iterator.h>
48#include <babeltrace/graph/notification-iterator-internal.h>
49#include <babeltrace/graph/notification-internal.h>
50#include <babeltrace/graph/notification-event.h>
51#include <babeltrace/graph/notification-event-internal.h>
52#include <babeltrace/graph/notification-packet.h>
53#include <babeltrace/graph/notification-packet-internal.h>
54#include <babeltrace/graph/notification-stream.h>
55#include <babeltrace/graph/notification-stream-internal.h>
56#include <babeltrace/graph/notification-discarded-elements-internal.h>
57#include <babeltrace/graph/port.h>
58#include <babeltrace/graph/graph-internal.h>
59#include <babeltrace/types.h>
60#include <babeltrace/assert-internal.h>
61#include <stdint.h>
62#include <inttypes.h>
63#include <stdlib.h>
64
65struct discarded_elements_state {
66 struct bt_clock_value *cur_begin;
67 uint64_t cur_count;
68};
69
70struct stream_state {
71 struct bt_stream *stream; /* owned by this */
72 struct bt_packet *cur_packet; /* owned by this */
73 struct discarded_elements_state discarded_packets_state;
74 struct discarded_elements_state discarded_events_state;
75 bt_bool is_ended;
76};
77
78enum action_type {
79 ACTION_TYPE_PUSH_NOTIF,
80 ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
81 ACTION_TYPE_ADD_STREAM_STATE,
82 ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
83 ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
84 ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS,
85 ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS,
86};
87
88struct action {
89 enum action_type type;
90 union {
91 /* ACTION_TYPE_PUSH_NOTIF */
92 struct {
93 struct bt_notification *notif; /* owned by this */
94 } push_notif;
95
96 /* ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM */
97 struct {
98 struct bt_stream *stream; /* owned by this */
99 struct bt_component *component; /* owned by this */
100 struct bt_port *port; /* owned by this */
101 } map_port_to_comp_in_stream;
102
103 /* ACTION_TYPE_ADD_STREAM_STATE */
104 struct {
105 struct bt_stream *stream; /* owned by this */
106 struct stream_state *stream_state; /* owned by this */
107 } add_stream_state;
108
109 /* ACTION_TYPE_SET_STREAM_STATE_IS_ENDED */
110 struct {
111 struct stream_state *stream_state; /* weak */
112 } set_stream_state_is_ended;
113
114 /* ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET */
115 struct {
116 struct stream_state *stream_state; /* weak */
117 struct bt_packet *packet; /* owned by this */
118 } set_stream_state_cur_packet;
119
120 /* ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS */
121 /* ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS */
122 struct {
123 struct stream_state *stream_state; /* weak */
124 struct bt_clock_value *cur_begin; /* owned by this */
125 uint64_t cur_count;
126 } update_stream_state_discarded_elements;
127 } payload;
128};
129
130static
131void stream_destroy_listener(struct bt_stream *stream, void *data)
132{
133 struct bt_notification_iterator_private_connection *iterator = data;
134
135 /* Remove associated stream state */
136 g_hash_table_remove(iterator->stream_states, stream);
137}
138
139static
140void destroy_stream_state(struct stream_state *stream_state)
141{
142 if (!stream_state) {
143 return;
144 }
145
146 BT_LOGV("Destroying stream state: stream-state-addr=%p", stream_state);
147 BT_LOGV_STR("Putting stream state's current packet.");
148 bt_put(stream_state->cur_packet);
149 BT_LOGV_STR("Putting stream state's stream.");
150 bt_put(stream_state->stream);
151 bt_put(stream_state->discarded_packets_state.cur_begin);
152 bt_put(stream_state->discarded_events_state.cur_begin);
153 g_free(stream_state);
154}
155
156static
157void destroy_action(struct action *action)
158{
159 BT_ASSERT(action);
160
161 switch (action->type) {
162 case ACTION_TYPE_PUSH_NOTIF:
163 BT_PUT(action->payload.push_notif.notif);
164 break;
165 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
166 BT_PUT(action->payload.map_port_to_comp_in_stream.stream);
167 BT_PUT(action->payload.map_port_to_comp_in_stream.component);
168 BT_PUT(action->payload.map_port_to_comp_in_stream.port);
169 break;
170 case ACTION_TYPE_ADD_STREAM_STATE:
171 BT_PUT(action->payload.add_stream_state.stream);
172 destroy_stream_state(
173 action->payload.add_stream_state.stream_state);
174 action->payload.add_stream_state.stream_state = NULL;
175 break;
176 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
177 BT_PUT(action->payload.set_stream_state_cur_packet.packet);
178 break;
179 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
180 break;
181 case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS:
182 case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS:
183 BT_PUT(action->payload.update_stream_state_discarded_elements.cur_begin);
184 break;
185 default:
186 BT_LOGF("Unexpected action's type: type=%d", action->type);
187 abort();
188 }
189}
190
191static
192void add_action(struct bt_notification_iterator_private_connection *iterator,
193 struct action *action)
194{
195 g_array_append_val(iterator->actions, *action);
196}
197
198static
199void clear_actions(struct bt_notification_iterator_private_connection *iterator)
200{
201 size_t i;
202
203 for (i = 0; i < iterator->actions->len; i++) {
204 struct action *action = &g_array_index(iterator->actions,
205 struct action, i);
206
207 destroy_action(action);
208 }
209
210 g_array_set_size(iterator->actions, 0);
211}
212
213static inline
214const char *action_type_string(enum action_type type)
215{
216 switch (type) {
217 case ACTION_TYPE_PUSH_NOTIF:
218 return "ACTION_TYPE_PUSH_NOTIF";
219 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
220 return "ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM";
221 case ACTION_TYPE_ADD_STREAM_STATE:
222 return "ACTION_TYPE_ADD_STREAM_STATE";
223 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
224 return "ACTION_TYPE_SET_STREAM_STATE_IS_ENDED";
225 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
226 return "ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET";
227 case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS:
228 return "ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS";
229 case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS:
230 return "ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS";
231 default:
232 return "(unknown)";
233 }
234}
235
236static
237void apply_actions(struct bt_notification_iterator_private_connection *iterator)
238{
239 size_t i;
240
241 BT_LOGV("Applying notification's iterator current actions: "
242 "count=%u", iterator->actions->len);
243
244 for (i = 0; i < iterator->actions->len; i++) {
245 struct action *action = &g_array_index(iterator->actions,
246 struct action, i);
247
248 BT_LOGV("Applying action: index=%zu, type=%s",
249 i, action_type_string(action->type));
250
251 switch (action->type) {
252 case ACTION_TYPE_PUSH_NOTIF:
253 /* Move notification to queue */
254 g_queue_push_head(iterator->queue,
255 action->payload.push_notif.notif);
256 bt_notification_freeze(
257 action->payload.push_notif.notif);
258 action->payload.push_notif.notif = NULL;
259 break;
260 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
261 bt_stream_map_component_to_port(
262 action->payload.map_port_to_comp_in_stream.stream,
263 action->payload.map_port_to_comp_in_stream.component,
264 action->payload.map_port_to_comp_in_stream.port);
265 break;
266 case ACTION_TYPE_ADD_STREAM_STATE:
267 /* Move stream state to hash table */
268 g_hash_table_insert(iterator->stream_states,
269 action->payload.add_stream_state.stream,
270 action->payload.add_stream_state.stream_state);
271
272 action->payload.add_stream_state.stream_state = NULL;
273 break;
274 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
275 /*
276 * We know that this stream is ended. We need to
277 * remember this as long as the stream exists to
278 * enforce that the same stream does not end
279 * twice.
280 *
281 * Here we add a destroy listener to the stream
282 * which we put after (becomes weak as the hash
283 * table key). If we were the last object to own
284 * this stream, the destroy listener is called
285 * when we call bt_put() which removes this
286 * stream state completely. This is important
287 * because the memory used by this stream object
288 * could be reused for another stream, and they
289 * must have different states.
290 */
291 bt_stream_add_destroy_listener(
292 action->payload.set_stream_state_is_ended.stream_state->stream,
293 stream_destroy_listener, iterator);
294 action->payload.set_stream_state_is_ended.stream_state->is_ended = BT_TRUE;
295 BT_PUT(action->payload.set_stream_state_is_ended.stream_state->stream);
296 break;
297 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
298 /* Move packet to stream state's current packet */
299 BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet,
300 action->payload.set_stream_state_cur_packet.packet);
301 break;
302 case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS:
303 case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS:
304 {
305 struct discarded_elements_state *state;
306
307 if (action->type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS) {
308 state = &action->payload.update_stream_state_discarded_elements.stream_state->discarded_packets_state;
309 } else {
310 state = &action->payload.update_stream_state_discarded_elements.stream_state->discarded_events_state;
311 }
312
313 BT_MOVE(state->cur_begin,
314 action->payload.update_stream_state_discarded_elements.cur_begin);
315 state->cur_count = action->payload.update_stream_state_discarded_elements.cur_count;
316 break;
317 }
318 default:
319 BT_LOGF("Unexpected action's type: type=%d",
320 action->type);
321 abort();
322 }
323 }
324
325 clear_actions(iterator);
326}
327
328static
329struct stream_state *create_stream_state(struct bt_stream *stream)
330{
331 struct stream_state *stream_state = g_new0(struct stream_state, 1);
332
333 if (!stream_state) {
334 BT_LOGE_STR("Failed to allocate one stream state.");
335 goto end;
336 }
337
338 /*
339 * The packet index is a monotonic counter which may not start
340 * at 0 at the beginning of the stream. We therefore need to
341 * have an internal object initial state of -1ULL to distinguish
342 * between initial state and having seen a packet with
343 * seqnum = 0.
344 */
345 stream_state->discarded_packets_state.cur_count = -1ULL;
346
347 /*
348 * We keep a reference to the stream until we know it's ended
349 * because we need to be able to create an automatic "stream
350 * end" notification when the user's "next" method returns
351 * BT_NOTIFICATION_ITERATOR_STATUS_END.
352 *
353 * We put this reference when the stream is marked as ended.
354 */
355 stream_state->stream = bt_get(stream);
356 BT_LOGV("Created stream state: stream-addr=%p, stream-name=\"%s\", "
357 "stream-state-addr=%p",
358 stream, bt_stream_get_name(stream), stream_state);
359
360end:
361 return stream_state;
362}
363
364static
365void destroy_base_notification_iterator(struct bt_object *obj)
366{
367 struct bt_notification_iterator *iterator =
368 container_of(obj, struct bt_notification_iterator, base);
369
370 BT_LOGD_STR("Putting current notification.");
371 bt_put(iterator->current_notification);
372 g_free(iterator);
373}
374
375static
376void bt_private_connection_notification_iterator_destroy(struct bt_object *obj)
377{
378 struct bt_notification_iterator_private_connection *iterator;
379
380 BT_ASSERT(obj);
381
382 /*
383 * The notification iterator's reference count is 0 if we're
384 * here. Increment it to avoid a double-destroy (possibly
385 * infinitely recursive). This could happen for example if the
386 * notification iterator's finalization function does bt_get()
387 * (or anything that causes bt_get() to be called) on itself
388 * (ref. count goes from 0 to 1), and then bt_put(): the
389 * reference count would go from 1 to 0 again and this function
390 * would be called again.
391 */
392 obj->ref_count.count++;
393 iterator = (void *) container_of(obj, struct bt_notification_iterator, base);
394 BT_LOGD("Destroying private connection notification iterator object: addr=%p",
395 iterator);
396 bt_private_connection_notification_iterator_finalize(iterator);
397
398 if (iterator->queue) {
399 struct bt_notification *notif;
400
401 BT_LOGD("Putting notifications in queue.");
402
403 while ((notif = g_queue_pop_tail(iterator->queue))) {
404 bt_put(notif);
405 }
406
407 g_queue_free(iterator->queue);
408 }
409
410 if (iterator->stream_states) {
411 /*
412 * Remove our destroy listener from each stream which
413 * has a state in this iterator. Otherwise the destroy
414 * listener would be called with an invalid/other
415 * notification iterator object.
416 */
417 GHashTableIter ht_iter;
418 gpointer stream_gptr, stream_state_gptr;
419
420 g_hash_table_iter_init(&ht_iter, iterator->stream_states);
421
422 while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) {
423 BT_ASSERT(stream_gptr);
424
425 BT_LOGD_STR("Removing stream's destroy listener for notification iterator.");
426 bt_stream_remove_destroy_listener(
427 (void *) stream_gptr, stream_destroy_listener,
428 iterator);
429 }
430
431 g_hash_table_destroy(iterator->stream_states);
432 }
433
434 if (iterator->actions) {
435 g_array_free(iterator->actions, TRUE);
436 }
437
438 if (iterator->connection) {
439 /*
440 * Remove ourself from the originating connection so
441 * that it does not try to finalize a dangling pointer
442 * later.
443 */
444 bt_connection_remove_iterator(iterator->connection, iterator);
445 }
446
447 destroy_base_notification_iterator(obj);
448}
449
450BT_HIDDEN
451void bt_private_connection_notification_iterator_finalize(
452 struct bt_notification_iterator_private_connection *iterator)
453{
454 struct bt_component_class *comp_class = NULL;
455 bt_component_class_notification_iterator_finalize_method
456 finalize_method = NULL;
457
458 BT_ASSERT(iterator);
459
460 switch (iterator->state) {
461 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_NON_INITIALIZED:
462 /* Skip user finalization if user initialization failed */
463 BT_LOGD("Not finalizing non-initialized notification iterator: "
464 "addr=%p", iterator);
465 return;
466 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED:
467 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED:
468 /* Already finalized */
469 BT_LOGD("Not finalizing notification iterator: already finalized: "
470 "addr=%p", iterator);
471 return;
472 default:
473 break;
474 }
475
476 BT_LOGD("Finalizing notification iterator: addr=%p", iterator);
477
478 if (iterator->state == BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ENDED) {
479 BT_LOGD("Updating notification iterator's state: "
480 "new-state=BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED");
481 iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED;
482 } else {
483 BT_LOGD("Updating notification iterator's state: "
484 "new-state=BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED");
485 iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED;
486 }
487
488 BT_ASSERT(iterator->upstream_component);
489 comp_class = iterator->upstream_component->class;
490
491 /* Call user-defined destroy method */
492 switch (comp_class->type) {
493 case BT_COMPONENT_CLASS_TYPE_SOURCE:
494 {
495 struct bt_component_class_source *source_class;
496
497 source_class = container_of(comp_class, struct bt_component_class_source, parent);
498 finalize_method = source_class->methods.iterator.finalize;
499 break;
500 }
501 case BT_COMPONENT_CLASS_TYPE_FILTER:
502 {
503 struct bt_component_class_filter *filter_class;
504
505 filter_class = container_of(comp_class, struct bt_component_class_filter, parent);
506 finalize_method = filter_class->methods.iterator.finalize;
507 break;
508 }
509 default:
510 /* Unreachable */
511 abort();
512 }
513
514 if (finalize_method) {
515 BT_LOGD("Calling user's finalization method: addr=%p",
516 iterator);
517 finalize_method(
518 bt_private_connection_private_notification_iterator_from_notification_iterator(iterator));
519 }
520
521 iterator->upstream_component = NULL;
522 iterator->upstream_port = NULL;
523 BT_LOGD("Finalized notification iterator: addr=%p", iterator);
524}
525
526BT_HIDDEN
527void bt_private_connection_notification_iterator_set_connection(
528 struct bt_notification_iterator_private_connection *iterator,
529 struct bt_connection *connection)
530{
531 BT_ASSERT(iterator);
532 iterator->connection = connection;
533 BT_LOGV("Set notification iterator's connection: "
534 "iter-addr=%p, conn-addr=%p", iterator, connection);
535}
536
537static
538int create_subscription_mask_from_notification_types(
539 struct bt_notification_iterator_private_connection *iterator,
540 const enum bt_notification_type *notif_types)
541{
542 const enum bt_notification_type *notif_type;
543 int ret = 0;
544
545 BT_ASSERT(notif_types);
546 iterator->subscription_mask = 0;
547
548 for (notif_type = notif_types;
549 *notif_type != BT_NOTIFICATION_TYPE_SENTINEL;
550 notif_type++) {
551 switch (*notif_type) {
552 case BT_NOTIFICATION_TYPE_ALL:
553 iterator->subscription_mask |=
554 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT |
555 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY |
556 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN |
557 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END |
558 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN |
559 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END |
560 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_EVENTS |
561 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_PACKETS;
562 break;
563 case BT_NOTIFICATION_TYPE_EVENT:
564 iterator->subscription_mask |= BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT;
565 break;
566 case BT_NOTIFICATION_TYPE_INACTIVITY:
567 iterator->subscription_mask |= BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY;
568 break;
569 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
570 iterator->subscription_mask |= BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN;
571 break;
572 case BT_NOTIFICATION_TYPE_STREAM_END:
573 iterator->subscription_mask |= BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END;
574 break;
575 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
576 iterator->subscription_mask |= BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN;
577 break;
578 case BT_NOTIFICATION_TYPE_PACKET_END:
579 iterator->subscription_mask |= BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
580 break;
581 case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS:
582 iterator->subscription_mask |= BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_EVENTS;
583 break;
584 case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS:
585 iterator->subscription_mask |= BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_PACKETS;
586 break;
587 default:
588 ret = -1;
589 goto end;
590 }
591
592 BT_LOGV("Added notification type to subscription mask: "
593 "type=%s, mask=%x",
594 bt_notification_type_string(*notif_type),
595 iterator->subscription_mask);
596 }
597
598end:
599 return ret;
600}
601
602static
603void init_notification_iterator(struct bt_notification_iterator *iterator,
604 enum bt_notification_iterator_type type,
605 bt_object_release_func destroy)
606{
607 bt_object_init(iterator, destroy);
608 iterator->type = type;
609}
610
611BT_HIDDEN
612enum bt_connection_status bt_private_connection_notification_iterator_create(
613 struct bt_component *upstream_comp,
614 struct bt_port *upstream_port,
615 const enum bt_notification_type *notification_types,
616 struct bt_connection *connection,
617 struct bt_notification_iterator_private_connection **user_iterator)
618{
619 enum bt_connection_status status = BT_CONNECTION_STATUS_OK;
620 enum bt_component_class_type type;
621 struct bt_notification_iterator_private_connection *iterator = NULL;
622
623 BT_ASSERT(upstream_comp);
624 BT_ASSERT(upstream_port);
625 BT_ASSERT(notification_types);
626 BT_ASSERT(bt_port_is_connected(upstream_port));
627 BT_ASSERT(user_iterator);
628 BT_LOGD("Creating notification iterator on private connection: "
629 "upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
630 "upstream-port-addr=%p, upstream-port-name=\"%s\", "
631 "conn-addr=%p",
632 upstream_comp, bt_component_get_name(upstream_comp),
633 upstream_port, bt_port_get_name(upstream_port),
634 connection);
635 type = bt_component_get_class_type(upstream_comp);
636 BT_ASSERT(type == BT_COMPONENT_CLASS_TYPE_SOURCE ||
637 type == BT_COMPONENT_CLASS_TYPE_FILTER);
638 iterator = g_new0(struct bt_notification_iterator_private_connection, 1);
639 if (!iterator) {
640 BT_LOGE_STR("Failed to allocate one private connection notification iterator.");
641 status = BT_CONNECTION_STATUS_NOMEM;
642 goto end;
643 }
644
645 init_notification_iterator((void *) iterator,
646 BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
647 bt_private_connection_notification_iterator_destroy);
648
649 if (create_subscription_mask_from_notification_types(iterator,
650 notification_types)) {
651 BT_LOGW_STR("Cannot create subscription mask from notification types.");
652 status = BT_CONNECTION_STATUS_INVALID;
653 goto end;
654 }
655
656 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
657 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
658 if (!iterator->stream_states) {
659 BT_LOGE_STR("Failed to allocate a GHashTable.");
660 status = BT_CONNECTION_STATUS_NOMEM;
661 goto end;
662 }
663
664 iterator->queue = g_queue_new();
665 if (!iterator->queue) {
666 BT_LOGE_STR("Failed to allocate a GQueue.");
667 status = BT_CONNECTION_STATUS_NOMEM;
668 goto end;
669 }
670
671 iterator->actions = g_array_new(FALSE, FALSE, sizeof(struct action));
672 if (!iterator->actions) {
673 BT_LOGE_STR("Failed to allocate a GArray.");
674 status = BT_CONNECTION_STATUS_NOMEM;
675 goto end;
676 }
677
678 iterator->upstream_component = upstream_comp;
679 iterator->upstream_port = upstream_port;
680 iterator->connection = connection;
681 iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_NON_INITIALIZED;
682 BT_LOGD("Created notification iterator: "
683 "upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
684 "upstream-port-addr=%p, upstream-port-name=\"%s\", "
685 "conn-addr=%p, iter-addr=%p",
686 upstream_comp, bt_component_get_name(upstream_comp),
687 upstream_port, bt_port_get_name(upstream_port),
688 connection, iterator);
689
690 /* Move reference to user */
691 *user_iterator = iterator;
692 iterator = NULL;
693
694end:
695 bt_put(iterator);
696 return status;
697}
698
699void *bt_private_connection_private_notification_iterator_get_user_data(
700 struct bt_private_connection_private_notification_iterator *private_iterator)
701{
702 struct bt_notification_iterator_private_connection *iterator =
703 bt_private_connection_notification_iterator_borrow_from_private(private_iterator);
704
705 return iterator ? iterator->user_data : NULL;
706}
707
708enum bt_notification_iterator_status
709bt_private_connection_private_notification_iterator_set_user_data(
710 struct bt_private_connection_private_notification_iterator *private_iterator,
711 void *data)
712{
713 enum bt_notification_iterator_status ret =
714 BT_NOTIFICATION_ITERATOR_STATUS_OK;
715 struct bt_notification_iterator_private_connection *iterator =
716 bt_private_connection_notification_iterator_borrow_from_private(private_iterator);
717
718 if (!iterator) {
719 BT_LOGW_STR("Invalid parameter: notification iterator is NULL.");
720 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
721 goto end;
722 }
723
724 iterator->user_data = data;
725 BT_LOGV("Set notification iterator's user data: "
726 "iter-addr=%p, user-data-addr=%p", iterator, data);
727
728end:
729 return ret;
730}
731
732struct bt_notification *bt_notification_iterator_get_notification(
733 struct bt_notification_iterator *iterator)
734{
735 struct bt_notification *notification = NULL;
736
737 if (!iterator) {
738 BT_LOGW_STR("Invalid parameter: notification iterator is NULL.");
739 goto end;
740 }
741
742 notification = bt_get(
743 bt_notification_iterator_borrow_current_notification(iterator));
744
745end:
746 return notification;
747}
748
749static
750enum bt_private_connection_notification_iterator_notif_type
751bt_notification_iterator_notif_type_from_notif_type(
752 enum bt_notification_type notif_type)
753{
754 enum bt_private_connection_notification_iterator_notif_type iter_notif_type;
755
756 switch (notif_type) {
757 case BT_NOTIFICATION_TYPE_EVENT:
758 iter_notif_type = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT;
759 break;
760 case BT_NOTIFICATION_TYPE_INACTIVITY:
761 iter_notif_type = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY;
762 break;
763 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
764 iter_notif_type = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN;
765 break;
766 case BT_NOTIFICATION_TYPE_STREAM_END:
767 iter_notif_type = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END;
768 break;
769 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
770 iter_notif_type = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN;
771 break;
772 case BT_NOTIFICATION_TYPE_PACKET_END:
773 iter_notif_type = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
774 break;
775 case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS:
776 iter_notif_type = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_EVENTS;
777 break;
778 case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS:
779 iter_notif_type = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_PACKETS;
780 break;
781 default:
782 abort();
783 }
784
785 return iter_notif_type;
786}
787
788static
789bt_bool validate_notification(
790 struct bt_notification_iterator_private_connection *iterator,
791 struct bt_notification *notif,
792 struct bt_stream *notif_stream,
793 struct bt_packet *notif_packet)
794{
795 bt_bool is_valid = BT_TRUE;
796 struct stream_state *stream_state;
797 struct bt_port *stream_comp_cur_port;
798
799 BT_ASSERT(notif_stream);
800 stream_comp_cur_port =
801 bt_stream_port_for_component(notif_stream,
802 iterator->upstream_component);
803 if (!stream_comp_cur_port) {
804 /*
805 * This is the first time this notification iterator
806 * bumps into this stream. Add an action to map the
807 * iterator's upstream component to the iterator's
808 * upstream port in this stream.
809 */
810 struct action action = {
811 .type = ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
812 .payload.map_port_to_comp_in_stream = {
813 .stream = bt_get(notif_stream),
814 .component = bt_get(iterator->upstream_component),
815 .port = bt_get(iterator->upstream_port),
816 },
817 };
818
819 add_action(iterator, &action);
820 } else {
821 if (stream_comp_cur_port != iterator->upstream_port) {
822 /*
823 * It looks like two different ports of the same
824 * component are emitting notifications which
825 * have references to the same stream. This is
826 * bad: the API guarantees that it can never
827 * happen.
828 */
829 BT_LOGW("Two different ports of the same component are emitting notifications which refer to the same stream: "
830 "stream-addr=%p, stream-name=\"%s\", "
831 "stream-comp-cur-port-addr=%p, "
832 "stream-comp-cur-port-name=%p, "
833 "iter-upstream-port-addr=%p, "
834 "iter-upstream-port-name=%s",
835 notif_stream,
836 bt_stream_get_name(notif_stream),
837 stream_comp_cur_port,
838 bt_port_get_name(stream_comp_cur_port),
839 iterator->upstream_port,
840 bt_port_get_name(iterator->upstream_port));
841 is_valid = BT_FALSE;
842 goto end;
843 }
844
845 }
846
847 stream_state = g_hash_table_lookup(iterator->stream_states,
848 notif_stream);
849 if (stream_state) {
850 BT_LOGV("Stream state already exists: "
851 "stream-addr=%p, stream-name=\"%s\", "
852 "stream-state-addr=%p",
853 notif_stream,
854 bt_stream_get_name(notif_stream), stream_state);
855
856 if (stream_state->is_ended) {
857 /*
858 * There's a new notification which has a
859 * reference to a stream which, from this
860 * iterator's point of view, is ended ("end of
861 * stream" notification was returned). This is
862 * bad: the API guarantees that it can never
863 * happen.
864 */
865 BT_LOGW("Stream is already ended: "
866 "stream-addr=%p, stream-name=\"%s\"",
867 notif_stream,
868 bt_stream_get_name(notif_stream));
869 is_valid = BT_FALSE;
870 goto end;
871 }
872
873 switch (notif->type) {
874 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
875 /*
876 * We already have a stream state, which means
877 * we already returned a "stream begin"
878 * notification: this is an invalid duplicate.
879 */
880 BT_LOGW("Duplicate stream beginning notification: "
881 "stream-addr=%p, stream-name=\"%s\"",
882 notif_stream,
883 bt_stream_get_name(notif_stream));
884 is_valid = BT_FALSE;
885 goto end;
886 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
887 if (notif_packet == stream_state->cur_packet) {
888 /* Duplicate "packet begin" notification */
889 BT_LOGW("Duplicate stream beginning notification: "
890 "stream-addr=%p, stream-name=\"%s\", "
891 "packet-addr=%p",
892 notif_stream,
893 bt_stream_get_name(notif_stream),
894 notif_packet);
895 is_valid = BT_FALSE;
896 goto end;
897 }
898 break;
899 default:
900 break;
901 }
902 }
903
904end:
905 return is_valid;
906}
907
908static
909bt_bool is_subscribed_to_notification_type(
910 struct bt_notification_iterator_private_connection *iterator,
911 enum bt_notification_type notif_type)
912{
913 uint32_t iter_notif_type =
914 (uint32_t) bt_notification_iterator_notif_type_from_notif_type(
915 notif_type);
916
917 return (iter_notif_type & iterator->subscription_mask) ? BT_TRUE : BT_FALSE;
918}
919
920static
921void add_action_push_notif(
922 struct bt_notification_iterator_private_connection *iterator,
923 struct bt_notification *notif)
924{
925 struct action action = {
926 .type = ACTION_TYPE_PUSH_NOTIF,
927 };
928
929 BT_ASSERT(notif);
930
931 if (!is_subscribed_to_notification_type(iterator, notif->type)) {
932 return;
933 }
934
935 action.payload.push_notif.notif = bt_get(notif);
936 add_action(iterator, &action);
937 BT_LOGV("Added \"push notification\" action: notif-addr=%p", notif);
938}
939
940static
941int add_action_push_notif_stream_begin(
942 struct bt_notification_iterator_private_connection *iterator,
943 struct bt_stream *stream)
944{
945 int ret = 0;
946 struct bt_notification *stream_begin_notif = NULL;
947
948 if (!is_subscribed_to_notification_type(iterator,
949 BT_NOTIFICATION_TYPE_STREAM_BEGIN)) {
950 BT_LOGV("Not adding \"push stream beginning notification\" action: "
951 "notification iterator is not subscribed: addr=%p",
952 iterator);
953 goto end;
954 }
955
956 BT_ASSERT(stream);
957 stream_begin_notif = bt_notification_stream_begin_create(stream);
958 if (!stream_begin_notif) {
959 BT_LOGE_STR("Cannot create stream beginning notification.");
960 goto error;
961 }
962
963 add_action_push_notif(iterator, stream_begin_notif);
964 BT_LOGV("Added \"push stream beginning notification\" action: "
965 "stream-addr=%p, stream-name=\"%s\"",
966 stream, bt_stream_get_name(stream));
967 goto end;
968
969error:
970 ret = -1;
971
972end:
973 bt_put(stream_begin_notif);
974 return ret;
975}
976
977static
978int add_action_push_notif_stream_end(
979 struct bt_notification_iterator_private_connection *iterator,
980 struct bt_stream *stream)
981{
982 int ret = 0;
983 struct bt_notification *stream_end_notif = NULL;
984
985 if (!is_subscribed_to_notification_type(iterator,
986 BT_NOTIFICATION_TYPE_STREAM_END)) {
987 BT_LOGV("Not adding \"push stream end notification\" action: "
988 "notification iterator is not subscribed: addr=%p",
989 iterator);
990 goto end;
991 }
992
993 BT_ASSERT(stream);
994 stream_end_notif = bt_notification_stream_end_create(stream);
995 if (!stream_end_notif) {
996 BT_LOGE_STR("Cannot create stream end notification.");
997 goto error;
998 }
999
1000 add_action_push_notif(iterator, stream_end_notif);
1001 BT_LOGV("Added \"push stream end notification\" action: "
1002 "stream-addr=%p, stream-name=\"%s\"",
1003 stream, bt_stream_get_name(stream));
1004 goto end;
1005
1006error:
1007 ret = -1;
1008
1009end:
1010 bt_put(stream_end_notif);
1011 return ret;
1012}
1013
1014static
1015int add_action_push_notif_packet_begin(
1016 struct bt_notification_iterator_private_connection *iterator,
1017 struct bt_packet *packet)
1018{
1019 int ret = 0;
1020 struct bt_notification *packet_begin_notif = NULL;
1021
1022 if (!is_subscribed_to_notification_type(iterator,
1023 BT_NOTIFICATION_TYPE_PACKET_BEGIN)) {
1024 BT_LOGV("Not adding \"push packet beginning notification\" action: "
1025 "notification iterator is not subscribed: addr=%p",
1026 iterator);
1027 goto end;
1028 }
1029
1030 BT_ASSERT(packet);
1031 packet_begin_notif = bt_notification_packet_begin_create(packet);
1032 if (!packet_begin_notif) {
1033 BT_LOGE_STR("Cannot create packet beginning notification.");
1034 goto error;
1035 }
1036
1037 add_action_push_notif(iterator, packet_begin_notif);
1038 BT_LOGV("Added \"push packet beginning notification\" action: "
1039 "packet-addr=%p", packet);
1040 goto end;
1041
1042error:
1043 ret = -1;
1044
1045end:
1046 bt_put(packet_begin_notif);
1047 return ret;
1048}
1049
1050static
1051int add_action_push_notif_packet_end(
1052 struct bt_notification_iterator_private_connection *iterator,
1053 struct bt_packet *packet)
1054{
1055 int ret = 0;
1056 struct bt_notification *packet_end_notif = NULL;
1057
1058 if (!is_subscribed_to_notification_type(iterator,
1059 BT_NOTIFICATION_TYPE_PACKET_END)) {
1060 BT_LOGV("Not adding \"push packet end notification\" action: "
1061 "notification iterator is not subscribed: addr=%p",
1062 iterator);
1063 goto end;
1064 }
1065
1066 BT_ASSERT(packet);
1067 packet_end_notif = bt_notification_packet_end_create(packet);
1068 if (!packet_end_notif) {
1069 BT_LOGE_STR("Cannot create packet end notification.");
1070 goto error;
1071 }
1072
1073 add_action_push_notif(iterator, packet_end_notif);
1074 BT_LOGV("Added \"push packet end notification\" action: "
1075 "packet-addr=%p", packet);
1076 goto end;
1077
1078error:
1079 ret = -1;
1080
1081end:
1082 bt_put(packet_end_notif);
1083 return ret;
1084}
1085
1086static
1087void add_action_set_stream_state_is_ended(
1088 struct bt_notification_iterator_private_connection *iterator,
1089 struct stream_state *stream_state)
1090{
1091 struct action action = {
1092 .type = ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
1093 .payload.set_stream_state_is_ended = {
1094 .stream_state = stream_state,
1095 },
1096 };
1097
1098 BT_ASSERT(stream_state);
1099 add_action(iterator, &action);
1100 BT_LOGV("Added \"set stream state's ended\" action: "
1101 "stream-state-addr=%p", stream_state);
1102}
1103
1104static
1105void add_action_set_stream_state_cur_packet(
1106 struct bt_notification_iterator_private_connection *iterator,
1107 struct stream_state *stream_state,
1108 struct bt_packet *packet)
1109{
1110 struct action action = {
1111 .type = ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
1112 .payload.set_stream_state_cur_packet = {
1113 .stream_state = stream_state,
1114 .packet = bt_get(packet),
1115 },
1116 };
1117
1118 BT_ASSERT(stream_state);
1119 add_action(iterator, &action);
1120 BT_LOGV("Added \"set stream state's current packet\" action: "
1121 "stream-state-addr=%p, packet-addr=%p",
1122 stream_state, packet);
1123}
1124
1125static
1126void add_action_update_stream_state_discarded_elements(
1127 struct bt_notification_iterator_private_connection *iterator,
1128 enum action_type type,
1129 struct stream_state *stream_state,
1130 struct bt_clock_value *cur_begin,
1131 uint64_t cur_count)
1132{
1133 struct action action = {
1134 .type = type,
1135 .payload.update_stream_state_discarded_elements = {
1136 .stream_state = stream_state,
1137 .cur_begin = bt_get(cur_begin),
1138 .cur_count = cur_count,
1139 },
1140 };
1141
1142 BT_ASSERT(stream_state);
1143 BT_ASSERT(type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS ||
1144 type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS);
1145 add_action(iterator, &action);
1146 if (type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS) {
1147 BT_LOGV("Added \"update stream state's discarded packets\" action: "
1148 "stream-state-addr=%p, cur-begin-addr=%p, cur-count=%" PRIu64,
1149 stream_state, cur_begin, cur_count);
1150 } else if (type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS) {
1151 BT_LOGV("Added \"update stream state's discarded events\" action: "
1152 "stream-state-addr=%p, cur-begin-addr=%p, cur-count=%" PRIu64,
1153 stream_state, cur_begin, cur_count);
1154 }
1155}
1156
1157static
1158int ensure_stream_state_exists(
1159 struct bt_notification_iterator_private_connection *iterator,
1160 struct bt_notification *stream_begin_notif,
1161 struct bt_stream *notif_stream,
1162 struct stream_state **_stream_state)
1163{
1164 int ret = 0;
1165 struct stream_state *stream_state = NULL;
1166
1167 if (!notif_stream) {
1168 /*
1169 * The notification does not reference any stream: no
1170 * need to get or create a stream state.
1171 */
1172 goto end;
1173 }
1174
1175 stream_state = g_hash_table_lookup(iterator->stream_states,
1176 notif_stream);
1177 if (!stream_state) {
1178 /*
1179 * This iterator did not bump into this stream yet:
1180 * create a stream state and a "stream begin"
1181 * notification.
1182 */
1183 struct action action = {
1184 .type = ACTION_TYPE_ADD_STREAM_STATE,
1185 .payload.add_stream_state = {
1186 .stream = bt_get(notif_stream),
1187 .stream_state = NULL,
1188 },
1189 };
1190
1191 stream_state = create_stream_state(notif_stream);
1192 if (!stream_state) {
1193 BT_LOGE_STR("Cannot create stream state.");
1194 goto error;
1195 }
1196
1197 action.payload.add_stream_state.stream_state = stream_state;
1198 add_action(iterator, &action);
1199
1200 if (stream_begin_notif) {
1201 add_action_push_notif(iterator, stream_begin_notif);
1202 } else {
1203 ret = add_action_push_notif_stream_begin(iterator,
1204 notif_stream);
1205 if (ret) {
1206 BT_LOGE_STR("Cannot add \"push stream beginning notification\" action.");
1207 goto error;
1208 }
1209 }
1210 }
1211 goto end;
1212
1213error:
1214 destroy_stream_state(stream_state);
1215 stream_state = NULL;
1216 ret = -1;
1217
1218end:
1219 *_stream_state = stream_state;
1220 return ret;
1221}
1222
1223static
1224struct bt_field *get_struct_field_uint(struct bt_field *struct_field,
1225 const char *field_name)
1226{
1227 struct bt_field *field = NULL;
1228 struct bt_field_type *ft = NULL;
1229
1230 field = bt_field_structure_get_field_by_name(struct_field,
1231 field_name);
1232 if (!field) {
1233 BT_LOGV_STR("`%s` field does not exist.");
1234 goto end;
1235 }
1236
1237 if (!bt_field_is_integer(field)) {
1238 BT_LOGV("Skipping `%s` field because its type is not an integer field type: "
1239 "field-addr=%p, ft-addr=%p, ft-id=%s", field_name,
1240 field, ft, bt_field_type_id_string(
1241 bt_field_type_get_type_id(ft)));
1242 BT_PUT(field);
1243 goto end;
1244 }
1245
1246 ft = bt_field_get_type(field);
1247 BT_ASSERT(ft);
1248
1249 if (bt_field_type_integer_is_signed(ft)) {
1250 BT_LOGV("Skipping `%s` integer field because its type is signed: "
1251 "field-addr=%p, ft-addr=%p", field_name, field, ft);
1252 BT_PUT(field);
1253 goto end;
1254 }
1255
1256end:
1257 bt_put(ft);
1258 return field;
1259}
1260
1261static
1262uint64_t get_packet_context_events_discarded(struct bt_packet *packet)
1263{
1264 struct bt_field *packet_context = NULL;
1265 struct bt_field *field = NULL;
1266 uint64_t retval = -1ULL;
1267 int ret;
1268
1269 packet_context = bt_packet_get_context(packet);
1270 if (!packet_context) {
1271 goto end;
1272 }
1273
1274 field = get_struct_field_uint(packet_context, "events_discarded");
1275 if (!field) {
1276 BT_LOGV("`events_discarded` field does not exist in packet's context field: "
1277 "packet-addr=%p, packet-context-field-addr=%p",
1278 packet, packet_context);
1279 goto end;
1280 }
1281
1282 BT_ASSERT(bt_field_is_integer(field));
1283 ret = bt_field_unsigned_integer_get_value(field, &retval);
1284 if (ret) {
1285 BT_LOGV("Cannot get raw value of packet's context field's `events_discarded` integer field: "
1286 "packet-addr=%p, field-addr=%p",
1287 packet, field);
1288 retval = -1ULL;
1289 goto end;
1290 }
1291
1292end:
1293 bt_put(packet_context);
1294 bt_put(field);
1295 return retval;
1296}
1297
1298static
1299uint64_t get_packet_context_packet_seq_num(struct bt_packet *packet)
1300{
1301 struct bt_field *packet_context = NULL;
1302 struct bt_field *field = NULL;
1303 uint64_t retval = -1ULL;
1304 int ret;
1305
1306 packet_context = bt_packet_get_context(packet);
1307 if (!packet_context) {
1308 goto end;
1309 }
1310
1311 field = get_struct_field_uint(packet_context, "packet_seq_num");
1312 if (!field) {
1313 BT_LOGV("`packet_seq_num` field does not exist in packet's context field: "
1314 "packet-addr=%p, packet-context-field-addr=%p",
1315 packet, packet_context);
1316 goto end;
1317 }
1318
1319 BT_ASSERT(bt_field_is_integer(field));
1320 ret = bt_field_unsigned_integer_get_value(field, &retval);
1321 if (ret) {
1322 BT_LOGV("Cannot get raw value of packet's context field's `packet_seq_num` integer field: "
1323 "packet-addr=%p, field-addr=%p",
1324 packet, field);
1325 retval = -1ULL;
1326 goto end;
1327 }
1328
1329end:
1330 bt_put(packet_context);
1331 bt_put(field);
1332 return retval;
1333}
1334
1335static
1336int handle_discarded_packets(
1337 struct bt_notification_iterator_private_connection *iterator,
1338 struct bt_packet *packet,
1339 struct bt_clock_value *ts_begin,
1340 struct bt_clock_value *ts_end,
1341 struct stream_state *stream_state)
1342{
1343 struct bt_notification *notif = NULL;
1344 uint64_t diff;
1345 uint64_t prev_count, next_count;
1346 int ret = 0;
1347
1348 next_count = get_packet_context_packet_seq_num(packet);
1349 if (next_count == -1ULL) {
1350 /*
1351 * Stream does not have seqnum field, skip discarded
1352 * packets feature.
1353 */
1354 goto end;
1355 }
1356 prev_count = stream_state->discarded_packets_state.cur_count;
1357
1358 if (prev_count != -1ULL) {
1359 if (next_count < prev_count) {
1360 BT_LOGW("Current value of packet's context field's `packet_seq_num` field is lesser than the previous value for the same stream: "
1361 "not updating the stream state's current value: "
1362 "packet-addr=%p, prev-count=%" PRIu64 ", "
1363 "cur-count=%" PRIu64,
1364 packet, prev_count, next_count);
1365 goto end;
1366 }
1367 if (next_count == prev_count) {
1368 BT_LOGW("Current value of packet's context field's `packet_seq_num` field is equal to the previous value for the same stream: "
1369 "not updating the stream state's current value: "
1370 "packet-addr=%p, prev-count=%" PRIu64 ", "
1371 "cur-count=%" PRIu64,
1372 packet, prev_count, next_count);
1373 goto end;
1374 }
1375
1376 diff = next_count - prev_count;
1377 if (diff > 1) {
1378 /*
1379 * Add a discarded packets notification. The packets
1380 * are considered to be lost between the state's last time
1381 * and the current packet's beginning time.
1382 * The counter is expected to monotonically increase of
1383 * 1 for each packet. Therefore, the number of missing
1384 * packets is 'diff - 1'.
1385 */
1386 notif = bt_notification_discarded_elements_create(
1387 BT_NOTIFICATION_TYPE_DISCARDED_PACKETS,
1388 stream_state->stream,
1389 stream_state->discarded_packets_state.cur_begin,
1390 ts_begin, diff - 1);
1391 if (!notif) {
1392 BT_LOGE_STR("Cannot create discarded packets notification.");
1393 ret = -1;
1394 goto end;
1395 }
1396
1397 add_action_push_notif(iterator, notif);
1398 }
1399 }
1400
1401 add_action_update_stream_state_discarded_elements(iterator,
1402 ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS,
1403 stream_state, ts_end, next_count);
1404
1405end:
1406 bt_put(notif);
1407 return ret;
1408}
1409
1410static
1411int handle_discarded_events(
1412 struct bt_notification_iterator_private_connection *iterator,
1413 struct bt_packet *packet,
1414 struct bt_clock_value *ts_begin,
1415 struct bt_clock_value *ts_end,
1416 struct stream_state *stream_state)
1417{
1418 struct bt_notification *notif = NULL;
1419 uint64_t diff;
1420 uint64_t next_count;
1421 int ret = 0;
1422
1423 next_count = get_packet_context_events_discarded(packet);
1424 if (next_count == -1ULL) {
1425 next_count = stream_state->discarded_events_state.cur_count;
1426 goto update_state;
1427 }
1428
1429 if (next_count < stream_state->discarded_events_state.cur_count) {
1430 BT_LOGW("Current value of packet's context field's `events_discarded` field is lesser than the previous value for the same stream: "
1431 "not updating the stream state's current value: "
1432 "packet-addr=%p, prev-count=%" PRIu64 ", "
1433 "cur-count=%" PRIu64,
1434 packet, stream_state->discarded_events_state.cur_count,
1435 next_count);
1436 goto end;
1437 }
1438
1439 diff = next_count - stream_state->discarded_events_state.cur_count;
1440 if (diff > 0) {
1441 /*
1442 * Add a discarded events notification. The events are
1443 * considered to be lost betweem the state's last time
1444 * and the current packet's end time.
1445 */
1446 notif = bt_notification_discarded_elements_create(
1447 BT_NOTIFICATION_TYPE_DISCARDED_EVENTS,
1448 stream_state->stream,
1449 stream_state->discarded_events_state.cur_begin,
1450 ts_end, diff);
1451 if (!notif) {
1452 BT_LOGE_STR("Cannot create discarded events notification.");
1453 ret = -1;
1454 goto end;
1455 }
1456
1457 add_action_push_notif(iterator, notif);
1458 }
1459
1460update_state:
1461 add_action_update_stream_state_discarded_elements(iterator,
1462 ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS,
1463 stream_state, ts_end, next_count);
1464
1465end:
1466 bt_put(notif);
1467 return ret;
1468}
1469
1470static
1471int get_field_clock_value(struct bt_field *root_field,
1472 const char *field_name,
1473 struct bt_clock_value **user_clock_val)
1474{
1475 struct bt_field *field;
1476 struct bt_field_type *ft = NULL;
1477 struct bt_clock_class *clock_class = NULL;
1478 struct bt_clock_value *clock_value = NULL;
1479 uint64_t val;
1480 int ret = 0;
1481
1482 field = get_struct_field_uint(root_field, field_name);
1483 if (!field) {
1484 /* Not an error: skip this */
1485 goto end;
1486 }
1487
1488 ft = bt_field_get_type(field);
1489 BT_ASSERT(ft);
1490 clock_class = bt_field_type_integer_get_mapped_clock_class(ft);
1491 if (!clock_class) {
1492 BT_LOGW("Integer field type has no mapped clock class but it's expected to have one: "
1493 "ft-addr=%p", ft);
1494 ret = -1;
1495 goto end;
1496 }
1497
1498 ret = bt_field_unsigned_integer_get_value(field, &val);
1499 if (ret) {
1500 BT_LOGW("Cannot get integer field's raw value: "
1501 "field-addr=%p", field);
1502 ret = -1;
1503 goto end;
1504 }
1505
1506 clock_value = bt_clock_value_create(clock_class, val);
1507 if (!clock_value) {
1508 BT_LOGE_STR("Cannot create clock value from clock class.");
1509 ret = -1;
1510 goto end;
1511 }
1512
1513 /* Move clock value to user */
1514 *user_clock_val = clock_value;
1515 clock_value = NULL;
1516
1517end:
1518 bt_put(field);
1519 bt_put(ft);
1520 bt_put(clock_class);
1521 bt_put(clock_value);
1522 return ret;
1523}
1524
1525static
1526int get_ts_begin_ts_end_from_packet(struct bt_packet *packet,
1527 struct bt_clock_value **user_ts_begin,
1528 struct bt_clock_value **user_ts_end)
1529{
1530 struct bt_field *packet_context = NULL;
1531 struct bt_clock_value *ts_begin = NULL;
1532 struct bt_clock_value *ts_end = NULL;
1533 int ret = 0;
1534
1535 packet_context = bt_packet_get_context(packet);
1536 if (!packet_context) {
1537 goto end;
1538 }
1539
1540 ret = get_field_clock_value(packet_context, "timestamp_begin",
1541 &ts_begin);
1542 if (ret) {
1543 BT_LOGW("Cannot create clock value for packet context's `timestamp_begin` field: "
1544 "packet-addr=%p, packet-context-field-addr=%p",
1545 packet, packet_context);
1546 goto end;
1547 }
1548
1549 ret = get_field_clock_value(packet_context, "timestamp_end",
1550 &ts_end);
1551 if (ret) {
1552 BT_LOGW("Cannot create clock value for packet context's `timestamp_begin` field: "
1553 "packet-addr=%p, packet-context-field-addr=%p",
1554 packet, packet_context);
1555 goto end;
1556 }
1557
1558 /* Move clock values to user */
1559 *user_ts_begin = ts_begin;
1560 ts_begin = NULL;
1561 *user_ts_end = ts_end;
1562 ts_end = NULL;
1563
1564end:
1565 bt_put(packet_context);
1566 bt_put(ts_begin);
1567 bt_put(ts_end);
1568 return ret;
1569}
1570
1571static
1572int handle_discarded_elements(
1573 struct bt_notification_iterator_private_connection *iterator,
1574 struct bt_packet *packet, struct stream_state *stream_state)
1575{
1576 struct bt_clock_value *ts_begin = NULL;
1577 struct bt_clock_value *ts_end = NULL;
1578 int ret;
1579
1580 ret = get_ts_begin_ts_end_from_packet(packet, &ts_begin, &ts_end);
1581 if (ret) {
1582 BT_LOGW("Cannot get packet's beginning or end clock values: "
1583 "packet-addr=%p, ret=%d", packet, ret);
1584 ret = -1;
1585 goto end;
1586 }
1587
1588 ret = handle_discarded_packets(iterator, packet, ts_begin, ts_end,
1589 stream_state);
1590 if (ret) {
1591 BT_LOGW("Cannot handle discarded packets for packet: "
1592 "packet-addr=%p, ret=%d", packet, ret);
1593 ret = -1;
1594 goto end;
1595 }
1596
1597 ret = handle_discarded_events(iterator, packet, ts_begin, ts_end,
1598 stream_state);
1599 if (ret) {
1600 BT_LOGW("Cannot handle discarded events for packet: "
1601 "packet-addr=%p, ret=%d", packet, ret);
1602 ret = -1;
1603 goto end;
1604 }
1605
1606end:
1607 bt_put(ts_begin);
1608 bt_put(ts_end);
1609 return ret;
1610}
1611
1612static
1613int handle_packet_switch(
1614 struct bt_notification_iterator_private_connection *iterator,
1615 struct bt_notification *packet_begin_notif,
1616 struct bt_packet *new_packet,
1617 struct stream_state *stream_state)
1618{
1619 int ret = 0;
1620
1621 if (stream_state->cur_packet == new_packet) {
1622 goto end;
1623 }
1624
1625 BT_LOGV("Handling packet switch: "
1626 "cur-packet-addr=%p, new-packet-addr=%p",
1627 stream_state->cur_packet, new_packet);
1628
1629 if (stream_state->cur_packet) {
1630 /* End of the current packet */
1631 ret = add_action_push_notif_packet_end(iterator,
1632 stream_state->cur_packet);
1633 if (ret) {
1634 BT_LOGE_STR("Cannot add \"push packet end notification\" action.");
1635 goto error;
1636 }
1637 }
1638
1639 /*
1640 * Check the new packet's context fields for discarded packets
1641 * and events to emit those automatic notifications.
1642 */
1643 ret = handle_discarded_elements(iterator, new_packet, stream_state);
1644 if (ret) {
1645 BT_LOGE_STR("Cannot handle discarded elements for new packet.");
1646 goto error;
1647 }
1648
1649 /* Beginning of the new packet */
1650 if (packet_begin_notif) {
1651 add_action_push_notif(iterator, packet_begin_notif);
1652 } else if (new_packet) {
1653 ret = add_action_push_notif_packet_begin(iterator,
1654 new_packet);
1655 if (ret) {
1656 BT_LOGE_STR("Cannot add \"push packet beginning notification\" action.");
1657 goto error;
1658 }
1659 }
1660
1661 add_action_set_stream_state_cur_packet(iterator, stream_state,
1662 new_packet);
1663 goto end;
1664
1665error:
1666 ret = -1;
1667
1668end:
1669 return ret;
1670}
1671
1672static
1673int handle_notif_stream_begin(
1674 struct bt_notification_iterator_private_connection *iterator,
1675 struct bt_notification *notif,
1676 struct bt_stream *notif_stream)
1677{
1678 int ret = 0;
1679 struct stream_state *stream_state;
1680
1681 BT_ASSERT(notif->type == BT_NOTIFICATION_TYPE_STREAM_BEGIN);
1682 BT_ASSERT(notif_stream);
1683 ret = ensure_stream_state_exists(iterator, notif, notif_stream,
1684 &stream_state);
1685 if (ret) {
1686 BT_LOGE_STR("Cannot ensure that stream state exists.");
1687 goto error;
1688 }
1689
1690 goto end;
1691
1692error:
1693 ret = -1;
1694
1695end:
1696 return ret;
1697}
1698
1699static
1700int handle_notif_stream_end(
1701 struct bt_notification_iterator_private_connection *iterator,
1702 struct bt_notification *notif,
1703 struct bt_stream *notif_stream)
1704{
1705 int ret = 0;
1706 struct stream_state *stream_state;
1707
1708 BT_ASSERT(notif->type == BT_NOTIFICATION_TYPE_STREAM_END);
1709 BT_ASSERT(notif_stream);
1710 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1711 &stream_state);
1712 if (ret) {
1713 BT_LOGE_STR("Cannot ensure that stream state exists.");
1714 goto error;
1715 }
1716
1717 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
1718 if (ret) {
1719 BT_LOGE_STR("Cannot handle packet switch.");
1720 goto error;
1721 }
1722
1723 add_action_push_notif(iterator, notif);
1724 add_action_set_stream_state_is_ended(iterator, stream_state);
1725 goto end;
1726
1727error:
1728 ret = -1;
1729
1730end:
1731 return ret;
1732}
1733
1734static
1735int handle_notif_discarded_elements(
1736 struct bt_notification_iterator_private_connection *iterator,
1737 struct bt_notification *notif,
1738 struct bt_stream *notif_stream)
1739{
1740 int ret = 0;
1741 struct stream_state *stream_state;
1742
1743 BT_ASSERT(notif->type == BT_NOTIFICATION_TYPE_DISCARDED_EVENTS ||
1744 notif->type == BT_NOTIFICATION_TYPE_DISCARDED_PACKETS);
1745 BT_ASSERT(notif_stream);
1746 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1747 &stream_state);
1748 if (ret) {
1749 BT_LOGE_STR("Cannot ensure that stream state exists.");
1750 goto error;
1751 }
1752
1753 add_action_push_notif(iterator, notif);
1754 goto end;
1755
1756error:
1757 ret = -1;
1758
1759end:
1760 return ret;
1761}
1762
1763static
1764int handle_notif_packet_begin(
1765 struct bt_notification_iterator_private_connection *iterator,
1766 struct bt_notification *notif,
1767 struct bt_stream *notif_stream,
1768 struct bt_packet *notif_packet)
1769{
1770 int ret = 0;
1771 struct stream_state *stream_state;
1772
1773 BT_ASSERT(notif->type == BT_NOTIFICATION_TYPE_PACKET_BEGIN);
1774 BT_ASSERT(notif_packet);
1775 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1776 &stream_state);
1777 if (ret) {
1778 BT_LOGE_STR("Cannot ensure that stream state exists.");
1779 goto error;
1780 }
1781
1782 ret = handle_packet_switch(iterator, notif, notif_packet, stream_state);
1783 if (ret) {
1784 BT_LOGE_STR("Cannot handle packet switch.");
1785 goto error;
1786 }
1787
1788 goto end;
1789
1790error:
1791 ret = -1;
1792
1793end:
1794 return ret;
1795}
1796
1797static
1798int handle_notif_packet_end(
1799 struct bt_notification_iterator_private_connection *iterator,
1800 struct bt_notification *notif,
1801 struct bt_stream *notif_stream,
1802 struct bt_packet *notif_packet)
1803{
1804 int ret = 0;
1805 struct stream_state *stream_state;
1806
1807 BT_ASSERT(notif->type == BT_NOTIFICATION_TYPE_PACKET_END);
1808 BT_ASSERT(notif_packet);
1809 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1810 &stream_state);
1811 if (ret) {
1812 BT_LOGE_STR("Cannot ensure that stream state exists.");
1813 goto error;
1814 }
1815
1816 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
1817 if (ret) {
1818 BT_LOGE_STR("Cannot handle packet switch.");
1819 goto error;
1820 }
1821
1822 /* End of the current packet */
1823 add_action_push_notif(iterator, notif);
1824 add_action_set_stream_state_cur_packet(iterator, stream_state, NULL);
1825 goto end;
1826
1827error:
1828 ret = -1;
1829
1830end:
1831 return ret;
1832}
1833
1834static
1835int handle_notif_event(
1836 struct bt_notification_iterator_private_connection *iterator,
1837 struct bt_notification *notif,
1838 struct bt_stream *notif_stream,
1839 struct bt_packet *notif_packet)
1840{
1841 int ret = 0;
1842 struct stream_state *stream_state;
1843
1844 BT_ASSERT(notif->type == BT_NOTIFICATION_TYPE_EVENT);
1845 BT_ASSERT(notif_packet);
1846 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1847 &stream_state);
1848 if (ret) {
1849 BT_LOGE_STR("Cannot ensure that stream state exists.");
1850 goto error;
1851 }
1852
1853 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
1854 if (ret) {
1855 BT_LOGE_STR("Cannot handle packet switch.");
1856 goto error;
1857 }
1858
1859 add_action_push_notif(iterator, notif);
1860 goto end;
1861
1862error:
1863 ret = -1;
1864
1865end:
1866 return ret;
1867}
1868
1869static
1870int enqueue_notification_and_automatic(
1871 struct bt_notification_iterator_private_connection *iterator,
1872 struct bt_notification *notif)
1873{
1874 int ret = 0;
1875 struct bt_event *notif_event = NULL;
1876 struct bt_stream *notif_stream = NULL;
1877 struct bt_packet *notif_packet = NULL;
1878
1879 BT_ASSERT(notif);
1880
1881 BT_LOGV("Enqueuing user notification and automatic notifications: "
1882 "iter-addr=%p, notif-addr=%p", iterator, notif);
1883
1884 // TODO: Skip most of this if the iterator is only subscribed
1885 // to event/inactivity notifications.
1886
1887 /* Get the stream and packet referred by the notification */
1888 switch (notif->type) {
1889 case BT_NOTIFICATION_TYPE_EVENT:
1890 notif_event = bt_notification_event_borrow_event(notif);
1891 BT_ASSERT(notif_event);
1892 notif_packet = bt_event_borrow_packet(notif_event);
1893 BT_ASSERT(notif_packet);
1894 break;
1895 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1896 notif_stream =
1897 bt_notification_stream_begin_borrow_stream(notif);
1898 BT_ASSERT(notif_stream);
1899 break;
1900 case BT_NOTIFICATION_TYPE_STREAM_END:
1901 notif_stream = bt_notification_stream_end_borrow_stream(notif);
1902 BT_ASSERT(notif_stream);
1903 break;
1904 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1905 notif_packet =
1906 bt_notification_packet_begin_borrow_packet(notif);
1907 BT_ASSERT(notif_packet);
1908 break;
1909 case BT_NOTIFICATION_TYPE_PACKET_END:
1910 notif_packet = bt_notification_packet_end_borrow_packet(notif);
1911 BT_ASSERT(notif_packet);
1912 break;
1913 case BT_NOTIFICATION_TYPE_INACTIVITY:
1914 /* Always valid */
1915 goto handle_notif;
1916 default:
1917 /*
1918 * Invalid type of notification. Only the notification
1919 * types above are allowed to be returned by a user
1920 * component.
1921 */
1922 BT_LOGF("Unexpected notification type at this point: "
1923 "notif-addr=%p, notif-type=%s", notif,
1924 bt_notification_type_string(notif->type));
1925 abort();
1926 }
1927
1928 if (notif_packet) {
1929 notif_stream = bt_packet_borrow_stream(notif_packet);
1930 BT_ASSERT(notif_stream);
1931 }
1932
1933 if (!notif_stream) {
1934 /*
1935 * The notification has no reference to a stream: it
1936 * cannot cause the creation of automatic notifications.
1937 */
1938 BT_LOGV_STR("Notification has no reference to any stream: skipping automatic notification generation.");
1939 goto end;
1940 }
1941
1942 if (!validate_notification(iterator, notif, notif_stream,
1943 notif_packet)) {
1944 BT_LOGW_STR("Invalid notification.");
1945 goto error;
1946 }
1947
1948handle_notif:
1949 switch (notif->type) {
1950 case BT_NOTIFICATION_TYPE_EVENT:
1951 ret = handle_notif_event(iterator, notif, notif_stream,
1952 notif_packet);
1953 break;
1954 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1955 ret = handle_notif_stream_begin(iterator, notif, notif_stream);
1956 break;
1957 case BT_NOTIFICATION_TYPE_STREAM_END:
1958 ret = handle_notif_stream_end(iterator, notif, notif_stream);
1959 break;
1960 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1961 ret = handle_notif_packet_begin(iterator, notif, notif_stream,
1962 notif_packet);
1963 break;
1964 case BT_NOTIFICATION_TYPE_PACKET_END:
1965 ret = handle_notif_packet_end(iterator, notif, notif_stream,
1966 notif_packet);
1967 break;
1968 case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS:
1969 case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS:
1970 ret = handle_notif_discarded_elements(iterator, notif,
1971 notif_stream);
1972 break;
1973 case BT_NOTIFICATION_TYPE_INACTIVITY:
1974 add_action_push_notif(iterator, notif);
1975 break;
1976 default:
1977 break;
1978 }
1979
1980 if (ret) {
1981 BT_LOGW_STR("Failed to handle notification for automatic notification generation.");
1982 goto error;
1983 }
1984
1985 apply_actions(iterator);
1986 BT_LOGV("Enqueued user notification and automatic notifications: "
1987 "iter-addr=%p, notif-addr=%p", iterator, notif);
1988 goto end;
1989
1990error:
1991 ret = -1;
1992
1993end:
1994 return ret;
1995}
1996
1997static
1998int handle_end(struct bt_notification_iterator_private_connection *iterator)
1999{
2000 GHashTableIter stream_state_iter;
2001 gpointer stream_gptr, stream_state_gptr;
2002 int ret = 0;
2003
2004 BT_LOGV("Handling end of iteration: addr=%p", iterator);
2005
2006 /*
2007 * Emit a "stream end" notification for each non-ended stream
2008 * known by this iterator and mark them as ended.
2009 */
2010 g_hash_table_iter_init(&stream_state_iter, iterator->stream_states);
2011
2012 while (g_hash_table_iter_next(&stream_state_iter, &stream_gptr,
2013 &stream_state_gptr)) {
2014 struct stream_state *stream_state = stream_state_gptr;
2015
2016 BT_ASSERT(stream_state_gptr);
2017
2018 if (stream_state->is_ended) {
2019 continue;
2020 }
2021
2022 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
2023 if (ret) {
2024 BT_LOGE_STR("Cannot handle packet switch.");
2025 goto error;
2026 }
2027
2028 ret = add_action_push_notif_stream_end(iterator, stream_gptr);
2029 if (ret) {
2030 BT_LOGE_STR("Cannot add \"push stream end notification\" action.");
2031 goto error;
2032 }
2033
2034 add_action_set_stream_state_is_ended(iterator, stream_state);
2035 }
2036
2037 apply_actions(iterator);
2038 BT_LOGV("Handled end of iteration: addr=%p", iterator);
2039 goto end;
2040
2041error:
2042 ret = -1;
2043
2044end:
2045 return ret;
2046}
2047
2048static
2049enum bt_notification_iterator_status ensure_queue_has_notifications(
2050 struct bt_notification_iterator_private_connection *iterator)
2051{
2052 struct bt_private_connection_private_notification_iterator *priv_iterator =
2053 bt_private_connection_private_notification_iterator_from_notification_iterator(iterator);
2054 bt_component_class_notification_iterator_next_method next_method = NULL;
2055 struct bt_notification_iterator_next_method_return next_return = {
2056 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
2057 .notification = NULL,
2058 };
2059 enum bt_notification_iterator_status status =
2060 BT_NOTIFICATION_ITERATOR_STATUS_OK;
2061 int ret;
2062
2063 BT_ASSERT(iterator);
2064 BT_LOGD("Ensuring that notification iterator's queue has at least one notification: "
2065 "iter-addr=%p, queue-size=%u, iter-state=%s",
2066 iterator, iterator->queue->length,
2067 bt_private_connection_notification_iterator_state_string(iterator->state));
2068
2069 if (iterator->queue->length > 0) {
2070 /*
2071 * We already have enough. Even if this notification
2072 * iterator is finalized, its user can still flush its
2073 * current queue's content by calling its "next" method
2074 * since this content is local and has no impact on what
2075 * used to be the iterator's upstream component.
2076 */
2077 BT_LOGD_STR("Queue already has at least one notification.");
2078 goto end;
2079 }
2080
2081 switch (iterator->state) {
2082 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED:
2083 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED:
2084 BT_LOGD_STR("Notification iterator's \"next\" called, but it is finalized.");
2085 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
2086 goto end;
2087 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ENDED:
2088 BT_LOGD_STR("Notification iterator is ended.");
2089 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
2090 goto end;
2091 default:
2092 break;
2093 }
2094
2095 BT_ASSERT(iterator->upstream_component);
2096 BT_ASSERT(iterator->upstream_component->class);
2097
2098 /* Pick the appropriate "next" method */
2099 switch (iterator->upstream_component->class->type) {
2100 case BT_COMPONENT_CLASS_TYPE_SOURCE:
2101 {
2102 struct bt_component_class_source *source_class =
2103 container_of(iterator->upstream_component->class,
2104 struct bt_component_class_source, parent);
2105
2106 BT_ASSERT(source_class->methods.iterator.next);
2107 next_method = source_class->methods.iterator.next;
2108 break;
2109 }
2110 case BT_COMPONENT_CLASS_TYPE_FILTER:
2111 {
2112 struct bt_component_class_filter *filter_class =
2113 container_of(iterator->upstream_component->class,
2114 struct bt_component_class_filter, parent);
2115
2116 BT_ASSERT(filter_class->methods.iterator.next);
2117 next_method = filter_class->methods.iterator.next;
2118 break;
2119 }
2120 default:
2121 abort();
2122 }
2123
2124 /*
2125 * Call the user's "next" method to get the next notification
2126 * and status.
2127 */
2128 BT_ASSERT(next_method);
2129
2130 while (iterator->queue->length == 0) {
2131 BT_LOGD_STR("Calling user's \"next\" method.");
2132 next_return = next_method(priv_iterator);
2133 BT_LOGD("User method returned: status=%s",
2134 bt_notification_iterator_status_string(next_return.status));
2135 if (next_return.status < 0) {
2136 BT_LOGW_STR("User method failed.");
2137 status = next_return.status;
2138 goto end;
2139 }
2140
2141 if (iterator->state == BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED ||
2142 iterator->state == BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED) {
2143 /*
2144 * The user's "next" method, somehow, cancelled
2145 * its own notification iterator. This can
2146 * happen, for example, when the user's method
2147 * removes the port on which there's the
2148 * connection from which the iterator was
2149 * created. In this case, said connection is
2150 * ended, and all its notification iterators are
2151 * finalized.
2152 *
2153 * Only bt_put() the returned notification if
2154 * the status is
2155 * BT_NOTIFICATION_ITERATOR_STATUS_OK because
2156 * otherwise this field could be garbage.
2157 */
2158 if (next_return.status ==
2159 BT_NOTIFICATION_ITERATOR_STATUS_OK) {
2160 bt_put(next_return.notification);
2161 }
2162
2163 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
2164 goto end;
2165 }
2166
2167 switch (next_return.status) {
2168 case BT_NOTIFICATION_ITERATOR_STATUS_END:
2169 ret = handle_end(iterator);
2170 if (ret) {
2171 BT_LOGW_STR("Cannot handle end of iteration.");
2172 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
2173 goto end;
2174 }
2175
2176 BT_ASSERT(iterator->state ==
2177 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ACTIVE);
2178 iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ENDED;
2179
2180 if (iterator->queue->length == 0) {
2181 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
2182 }
2183
2184 BT_LOGD("Set new status: status=%s",
2185 bt_notification_iterator_status_string(status));
2186 goto end;
2187 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
2188 status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
2189 goto end;
2190 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
2191 if (!next_return.notification) {
2192 BT_LOGW_STR("User method returned BT_NOTIFICATION_ITERATOR_STATUS_OK, but notification is NULL.");
2193 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
2194 goto end;
2195 }
2196
2197 /*
2198 * Ignore some notifications which are always
2199 * automatically generated by the notification
2200 * iterator to make sure they have valid values.
2201 */
2202 switch (next_return.notification->type) {
2203 case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS:
2204 case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS:
2205 BT_LOGV("Ignoring discarded elements notification returned by notification iterator's \"next\" method: "
2206 "notif-type=%s",
2207 bt_notification_type_string(next_return.notification->type));
2208 BT_PUT(next_return.notification);
2209 continue;
2210 default:
2211 break;
2212 }
2213
2214 /*
2215 * We know the notification is valid. Before we
2216 * push it to the head of the queue, push the
2217 * appropriate automatic notifications if any.
2218 */
2219 ret = enqueue_notification_and_automatic(iterator,
2220 next_return.notification);
2221 BT_PUT(next_return.notification);
2222 if (ret) {
2223 BT_LOGW("Cannot enqueue notification and automatic notifications.");
2224 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
2225 goto end;
2226 }
2227 break;
2228 default:
2229 /* Unknown non-error status */
2230 abort();
2231 }
2232 }
2233
2234end:
2235 return status;
2236}
2237
2238enum bt_notification_iterator_status
2239bt_notification_iterator_next(struct bt_notification_iterator *iterator)
2240{
2241 enum bt_notification_iterator_status status;
2242
2243 if (!iterator) {
2244 BT_LOGW_STR("Invalid parameter: notification iterator is NULL.");
2245 status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
2246 goto end;
2247 }
2248
2249 BT_LOGD("Notification iterator's \"next\": iter-addr=%p", iterator);
2250
2251 switch (iterator->type) {
2252 case BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION:
2253 {
2254 struct bt_notification_iterator_private_connection *priv_conn_iter =
2255 (void *) iterator;
2256 struct bt_notification *notif;
2257
2258 /*
2259 * Make sure that the iterator's queue contains at least
2260 * one notification.
2261 */
2262 status = ensure_queue_has_notifications(priv_conn_iter);
2263 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
2264 goto end;
2265 }
2266
2267 /*
2268 * Move the notification at the tail of the queue to the
2269 * iterator's current notification.
2270 */
2271 BT_ASSERT(priv_conn_iter->queue->length > 0);
2272 notif = g_queue_pop_tail(priv_conn_iter->queue);
2273 bt_notification_iterator_replace_current_notification(
2274 iterator, notif);
2275 bt_put(notif);
2276 break;
2277 }
2278 case BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT:
2279 {
2280 struct bt_notification_iterator_output_port *out_port_iter =
2281 (void *) iterator;
2282
2283 /*
2284 * Keep current notification in case there's an error:
2285 * restore this notification so that the current
2286 * notification is not changed from the user's point of
2287 * view.
2288 */
2289 struct bt_notification *old_notif =
2290 bt_get(bt_notification_iterator_borrow_current_notification(iterator));
2291 enum bt_graph_status graph_status;
2292
2293 /*
2294 * Put current notification since it's possibly
2295 * about to be replaced by a new one by the
2296 * colander sink.
2297 */
2298 bt_notification_iterator_replace_current_notification(
2299 iterator, NULL);
2300 graph_status = bt_graph_consume_sink_no_check(
2301 out_port_iter->graph,
2302 out_port_iter->colander);
2303 switch (graph_status) {
2304 case BT_GRAPH_STATUS_CANCELED:
2305 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
2306 break;
2307 case BT_GRAPH_STATUS_AGAIN:
2308 status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
2309 break;
2310 case BT_GRAPH_STATUS_END:
2311 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
2312 break;
2313 case BT_GRAPH_STATUS_NOMEM:
2314 status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
2315 break;
2316 case BT_GRAPH_STATUS_OK:
2317 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
2318 BT_ASSERT(bt_notification_iterator_borrow_current_notification(iterator));
2319 break;
2320 default:
2321 /* Other errors */
2322 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
2323 }
2324
2325 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
2326 /* Error/exception: restore old notification */
2327 bt_notification_iterator_replace_current_notification(
2328 iterator, old_notif);
2329 }
2330
2331 bt_put(old_notif);
2332 break;
2333 }
2334 default:
2335 BT_LOGF("Unknown notification iterator type: addr=%p, type=%d",
2336 iterator, iterator->type);
2337 abort();
2338 }
2339
2340end:
2341 return status;
2342}
2343
2344struct bt_component *bt_private_connection_notification_iterator_get_component(
2345 struct bt_notification_iterator *iterator)
2346{
2347 struct bt_component *comp = NULL;
2348 struct bt_notification_iterator_private_connection *iter_priv_conn;
2349
2350 if (!iterator) {
2351 BT_LOGW_STR("Invalid parameter: notification iterator is NULL.");
2352 goto end;
2353 }
2354
2355 if (iterator->type != BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION) {
2356 BT_LOGW_STR("Invalid parameter: notification iterator was not created from a private connection.");
2357 goto end;
2358 }
2359
2360 iter_priv_conn = (void *) iterator;
2361 comp = bt_get(iter_priv_conn->upstream_component);
2362
2363end:
2364 return comp;
2365}
2366
2367struct bt_private_component *
2368bt_private_connection_private_notification_iterator_get_private_component(
2369 struct bt_private_connection_private_notification_iterator *private_iterator)
2370{
2371 return bt_private_component_from_component(
2372 bt_private_connection_notification_iterator_get_component(
2373 (void *) bt_private_connection_notification_iterator_borrow_from_private(private_iterator)));
2374}
2375
2376static
2377void bt_output_port_notification_iterator_destroy(struct bt_object *obj)
2378{
2379 struct bt_notification_iterator_output_port *iterator =
2380 (void *) container_of(obj, struct bt_notification_iterator, base);
2381
2382 BT_LOGD("Destroying output port notification iterator object: addr=%p",
2383 iterator);
2384 BT_LOGD_STR("Putting graph.");
2385 bt_put(iterator->graph);
2386 BT_LOGD_STR("Putting output port.");
2387 bt_put(iterator->output_port);
2388 BT_LOGD_STR("Putting colander sink component.");
2389 bt_put(iterator->colander);
2390 destroy_base_notification_iterator(obj);
2391}
2392
2393struct bt_notification_iterator *bt_output_port_notification_iterator_create(
2394 struct bt_port *output_port,
2395 const char *colander_component_name,
2396 const enum bt_notification_type *notification_types)
2397{
2398 struct bt_notification_iterator *iterator_base = NULL;
2399 struct bt_notification_iterator_output_port *iterator = NULL;
2400 struct bt_component_class *colander_comp_cls = NULL;
2401 struct bt_component *output_port_comp = NULL;
2402 struct bt_component *colander_comp;
2403 struct bt_graph *graph = NULL;
2404 enum bt_graph_status graph_status;
2405 const char *colander_comp_name;
2406 struct bt_port *colander_in_port = NULL;
2407 struct bt_component_class_sink_colander_data colander_data;
2408
2409 if (!output_port) {
2410 BT_LOGW_STR("Invalid parameter: port is NULL.");
2411 goto error;
2412 }
2413
2414 if (bt_port_get_type(output_port) != BT_PORT_TYPE_OUTPUT) {
2415 BT_LOGW_STR("Invalid parameter: port is not an output port.");
2416 goto error;
2417 }
2418
2419 output_port_comp = bt_port_get_component(output_port);
2420 if (!output_port_comp) {
2421 BT_LOGW("Cannot get output port's component: "
2422 "port-addr=%p, port-name=\"%s\"",
2423 output_port, bt_port_get_name(output_port));
2424 goto error;
2425 }
2426
2427 graph = bt_component_get_graph(output_port_comp);
2428 BT_ASSERT(graph);
2429
2430 /* Create notification iterator */
2431 BT_LOGD("Creating notification iterator on output port: "
2432 "comp-addr=%p, comp-name\"%s\", port-addr=%p, port-name=\"%s\"",
2433 output_port_comp, bt_component_get_name(output_port_comp),
2434 output_port, bt_port_get_name(output_port));
2435 iterator = g_new0(struct bt_notification_iterator_output_port, 1);
2436 if (!iterator) {
2437 BT_LOGE_STR("Failed to allocate one output port notification iterator.");
2438 goto error;
2439 }
2440
2441 init_notification_iterator((void *) iterator,
2442 BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
2443 bt_output_port_notification_iterator_destroy);
2444
2445 /* Create colander component */
2446 colander_comp_cls = bt_component_class_sink_colander_get();
2447 if (!colander_comp_cls) {
2448 BT_LOGW("Cannot get colander sink component class.");
2449 goto error;
2450 }
2451
2452 BT_MOVE(iterator->graph, graph);
2453 iterator_base = (void *) iterator;
2454 colander_comp_name =
2455 colander_component_name ? colander_component_name : "colander";
2456 colander_data.notification = &iterator_base->current_notification;
2457 colander_data.notification_types = notification_types;
2458 graph_status = bt_graph_add_component_with_init_method_data(
2459 iterator->graph, colander_comp_cls, colander_comp_name,
2460 NULL, &colander_data, &iterator->colander);
2461 if (graph_status != BT_GRAPH_STATUS_OK) {
2462 BT_LOGW("Cannot add colander sink component to graph: "
2463 "graph-addr=%p, name=\"%s\", graph-status=%s",
2464 iterator->graph, colander_comp_name,
2465 bt_graph_status_string(graph_status));
2466 goto error;
2467 }
2468
2469 /*
2470 * Connect provided output port to the colander component's
2471 * input port.
2472 */
2473 colander_in_port = bt_component_sink_get_input_port_by_index(
2474 iterator->colander, 0);
2475 BT_ASSERT(colander_in_port);
2476 graph_status = bt_graph_connect_ports(iterator->graph,
2477 output_port, colander_in_port, NULL);
2478 if (graph_status != BT_GRAPH_STATUS_OK) {
2479 BT_LOGW("Cannot add colander sink component to graph: "
2480 "graph-addr=%p, name=\"%s\", graph-status=%s",
2481 iterator->graph, colander_comp_name,
2482 bt_graph_status_string(graph_status));
2483 goto error;
2484 }
2485
2486 /*
2487 * At this point everything went fine. Make the graph
2488 * nonconsumable forever so that only this notification iterator
2489 * can consume (thanks to bt_graph_consume_sink_no_check()).
2490 * This avoids leaking the notification created by the colander
2491 * sink and moved to the base notification iterator's current
2492 * notification member.
2493 */
2494 bt_graph_set_can_consume(iterator->graph, BT_FALSE);
2495 goto end;
2496
2497error:
2498 if (iterator && iterator->graph && iterator->colander) {
2499 int ret;
2500
2501 /* Remove created colander component from graph if any */
2502 colander_comp = iterator->colander;
2503 BT_PUT(iterator->colander);
2504
2505 /*
2506 * At this point the colander component's reference
2507 * count is 0 because iterator->colander was the only
2508 * owner. We also know that it is not connected because
2509 * this is the last operation before this function
2510 * succeeds.
2511 *
2512 * Since we honor the preconditions here,
2513 * bt_graph_remove_unconnected_component() always
2514 * succeeds.
2515 */
2516 ret = bt_graph_remove_unconnected_component(iterator->graph,
2517 colander_comp);
2518 BT_ASSERT(ret == 0);
2519 }
2520
2521 BT_PUT(iterator);
2522
2523end:
2524 bt_put(colander_in_port);
2525 bt_put(colander_comp_cls);
2526 bt_put(output_port_comp);
2527 bt_put(graph);
2528 return (void *) iterator;
2529}
2530
2531struct bt_notification_iterator *
2532bt_private_connection_notification_iterator_from_private(
2533 struct bt_private_connection_private_notification_iterator *private_notification_iterator)
2534{
2535 return bt_get(
2536 bt_private_connection_notification_iterator_borrow_from_private(
2537 private_notification_iterator));
2538}
This page took 0.034325 seconds and 4 git commands to generate.