Fix: don't call iterator finalize more than once
[babeltrace.git] / lib / graph / iterator.c
1 /*
2 * iterator.c
3 *
4 * Babeltrace Notification Iterator
5 *
6 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining a copy
10 * of this software and associated documentation files (the "Software"), to deal
11 * in the Software without restriction, including without limitation the rights
12 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 * copies of the Software, and to permit persons to whom the Software is
14 * furnished to do so, subject to the following conditions:
15 *
16 * The above copyright notice and this permission notice shall be included in
17 * all copies or substantial portions of the Software.
18 *
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 * SOFTWARE.
26 */
27
28 #include <babeltrace/compiler-internal.h>
29 #include <babeltrace/ref.h>
30 #include <babeltrace/ctf-ir/event-internal.h>
31 #include <babeltrace/ctf-ir/packet-internal.h>
32 #include <babeltrace/ctf-ir/stream-internal.h>
33 #include <babeltrace/graph/connection-internal.h>
34 #include <babeltrace/graph/component.h>
35 #include <babeltrace/graph/component-source-internal.h>
36 #include <babeltrace/graph/component-class-internal.h>
37 #include <babeltrace/graph/notification.h>
38 #include <babeltrace/graph/notification-iterator.h>
39 #include <babeltrace/graph/notification-iterator-internal.h>
40 #include <babeltrace/graph/notification-internal.h>
41 #include <babeltrace/graph/notification-event.h>
42 #include <babeltrace/graph/notification-event-internal.h>
43 #include <babeltrace/graph/notification-packet.h>
44 #include <babeltrace/graph/notification-packet-internal.h>
45 #include <babeltrace/graph/notification-stream.h>
46 #include <babeltrace/graph/notification-stream-internal.h>
47 #include <babeltrace/graph/port.h>
48 #include <babeltrace/types.h>
49 #include <stdint.h>
50
51 struct stream_state {
52 struct bt_ctf_stream *stream; /* owned by this */
53 struct bt_ctf_packet *cur_packet; /* owned by this */
54 bt_bool is_ended;
55 };
56
57 enum action_type {
58 ACTION_TYPE_PUSH_NOTIF,
59 ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
60 ACTION_TYPE_ADD_STREAM_STATE,
61 ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
62 ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
63 };
64
65 struct action {
66 enum action_type type;
67 union {
68 /* ACTION_TYPE_PUSH_NOTIF */
69 struct {
70 struct bt_notification *notif; /* owned by this */
71 } push_notif;
72
73 /* ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM */
74 struct {
75 struct bt_ctf_stream *stream; /* owned by this */
76 struct bt_component *component; /* owned by this */
77 struct bt_port *port; /* owned by this */
78 } map_port_to_comp_in_stream;
79
80 /* ACTION_TYPE_ADD_STREAM_STATE */
81 struct {
82 struct bt_ctf_stream *stream; /* owned by this */
83 struct stream_state *stream_state; /* owned by this */
84 } add_stream_state;
85
86 /* ACTION_TYPE_SET_STREAM_STATE_IS_ENDED */
87 struct {
88 struct stream_state *stream_state; /* weak */
89 } set_stream_state_is_ended;
90
91 /* ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET */
92 struct {
93 struct stream_state *stream_state; /* weak */
94 struct bt_ctf_packet *packet; /* owned by this */
95 } set_stream_state_cur_packet;
96 } payload;
97 };
98
99 static
100 void stream_destroy_listener(struct bt_ctf_stream *stream, void *data)
101 {
102 struct bt_notification_iterator *iterator = data;
103
104 /* Remove associated stream state */
105 g_hash_table_remove(iterator->stream_states, stream);
106 }
107
108 static
109 void destroy_stream_state(struct stream_state *stream_state)
110 {
111 if (!stream_state) {
112 return;
113 }
114
115 bt_put(stream_state->cur_packet);
116 bt_put(stream_state->stream);
117 g_free(stream_state);
118 }
119
120 static
121 void destroy_action(struct action *action)
122 {
123 assert(action);
124
125 switch (action->type) {
126 case ACTION_TYPE_PUSH_NOTIF:
127 BT_PUT(action->payload.push_notif.notif);
128 break;
129 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
130 BT_PUT(action->payload.map_port_to_comp_in_stream.stream);
131 BT_PUT(action->payload.map_port_to_comp_in_stream.component);
132 BT_PUT(action->payload.map_port_to_comp_in_stream.port);
133 break;
134 case ACTION_TYPE_ADD_STREAM_STATE:
135 BT_PUT(action->payload.add_stream_state.stream);
136 destroy_stream_state(
137 action->payload.add_stream_state.stream_state);
138 action->payload.add_stream_state.stream_state = NULL;
139 break;
140 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
141 BT_PUT(action->payload.set_stream_state_cur_packet.packet);
142 break;
143 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
144 break;
145 default:
146 assert(BT_FALSE);
147 }
148 }
149
150 static
151 void add_action(struct bt_notification_iterator *iterator,
152 struct action *action)
153 {
154 g_array_append_val(iterator->actions, *action);
155 }
156
157 static
158 void clear_actions(struct bt_notification_iterator *iterator)
159 {
160 size_t i;
161
162 for (i = 0; i < iterator->actions->len; i++) {
163 struct action *action = &g_array_index(iterator->actions,
164 struct action, i);
165
166 destroy_action(action);
167 }
168
169 g_array_set_size(iterator->actions, 0);
170 }
171
172 static
173 void apply_actions(struct bt_notification_iterator *iterator)
174 {
175 size_t i;
176
177 for (i = 0; i < iterator->actions->len; i++) {
178 struct action *action = &g_array_index(iterator->actions,
179 struct action, i);
180
181 switch (action->type) {
182 case ACTION_TYPE_PUSH_NOTIF:
183 /* Move notification to queue */
184 g_queue_push_head(iterator->queue,
185 action->payload.push_notif.notif);
186 bt_notification_freeze(
187 action->payload.push_notif.notif);
188 action->payload.push_notif.notif = NULL;
189 break;
190 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
191 bt_ctf_stream_map_component_to_port(
192 action->payload.map_port_to_comp_in_stream.stream,
193 action->payload.map_port_to_comp_in_stream.component,
194 action->payload.map_port_to_comp_in_stream.port);
195 break;
196 case ACTION_TYPE_ADD_STREAM_STATE:
197 /* Move stream state to hash table */
198 g_hash_table_insert(iterator->stream_states,
199 action->payload.add_stream_state.stream,
200 action->payload.add_stream_state.stream_state);
201
202 action->payload.add_stream_state.stream_state = NULL;
203 break;
204 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
205 /*
206 * We know that this stream is ended. We need to
207 * remember this as long as the stream exists to
208 * enforce that the same stream does not end
209 * twice.
210 *
211 * Here we add a destroy listener to the stream
212 * which we put after (becomes weak as the hash
213 * table key). If we were the last object to own
214 * this stream, the destroy listener is called
215 * when we call bt_put() which removes this
216 * stream state completely. This is important
217 * because the memory used by this stream object
218 * could be reused for another stream, and they
219 * must have different states.
220 */
221 bt_ctf_stream_add_destroy_listener(
222 action->payload.set_stream_state_is_ended.stream_state->stream,
223 stream_destroy_listener, iterator);
224 action->payload.set_stream_state_is_ended.stream_state->is_ended = BT_TRUE;
225 BT_PUT(action->payload.set_stream_state_is_ended.stream_state->stream);
226 break;
227 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
228 /* Move packet to stream state's current packet */
229 BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet,
230 action->payload.set_stream_state_cur_packet.packet);
231 break;
232 default:
233 assert(BT_FALSE);
234 }
235 }
236
237 clear_actions(iterator);
238 }
239
240 static
241 struct stream_state *create_stream_state(struct bt_ctf_stream *stream)
242 {
243 struct stream_state *stream_state = g_new0(struct stream_state, 1);
244
245 if (!stream_state) {
246 goto end;
247 }
248
249 /*
250 * We keep a reference to the stream until we know it's ended
251 * because we need to be able to create an automatic "stream
252 * end" notification when the user's "next" method returns
253 * BT_NOTIFICATION_ITERATOR_STATUS_END.
254 *
255 * We put this reference when the stream is marked as ended.
256 */
257 stream_state->stream = bt_get(stream);
258
259 end:
260 return stream_state;
261 }
262
263 static
264 void bt_notification_iterator_destroy(struct bt_object *obj)
265 {
266 struct bt_notification_iterator *iterator;
267
268 assert(obj);
269
270 /*
271 * The notification iterator's reference count is 0 if we're
272 * here. Increment it to avoid a double-destroy (possibly
273 * infinitely recursive). This could happen for example if the
274 * notification iterator's finalization function does bt_get()
275 * (or anything that causes bt_get() to be called) on itself
276 * (ref. count goes from 0 to 1), and then bt_put(): the
277 * reference count would go from 1 to 0 again and this function
278 * would be called again.
279 */
280 obj->ref_count.count++;
281 iterator = container_of(obj, struct bt_notification_iterator, base);
282 bt_notification_iterator_finalize(iterator);
283
284 if (iterator->queue) {
285 struct bt_notification *notif;
286
287 while ((notif = g_queue_pop_tail(iterator->queue))) {
288 bt_put(notif);
289 }
290
291 g_queue_free(iterator->queue);
292 }
293
294 if (iterator->stream_states) {
295 /*
296 * Remove our destroy listener from each stream which
297 * has a state in this iterator. Otherwise the destroy
298 * listener would be called with an invalid/other
299 * notification iterator object.
300 */
301 GHashTableIter ht_iter;
302 gpointer stream_gptr, stream_state_gptr;
303
304 g_hash_table_iter_init(&ht_iter, iterator->stream_states);
305
306 while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) {
307 assert(stream_gptr);
308 bt_ctf_stream_remove_destroy_listener(
309 (void *) stream_gptr, stream_destroy_listener,
310 iterator);
311 }
312
313 g_hash_table_destroy(iterator->stream_states);
314 }
315
316 if (iterator->actions) {
317 g_array_free(iterator->actions, TRUE);
318 }
319
320 if (iterator->connection) {
321 /*
322 * Remove ourself from the originating connection so
323 * that it does not try to finalize a dangling pointer
324 * later.
325 */
326 bt_connection_remove_iterator(iterator->connection, iterator);
327 }
328
329 bt_put(iterator->current_notification);
330 g_free(iterator);
331 }
332
333 BT_HIDDEN
334 void bt_notification_iterator_finalize(
335 struct bt_notification_iterator *iterator)
336 {
337 struct bt_component_class *comp_class = NULL;
338 bt_component_class_notification_iterator_finalize_method
339 finalize_method = NULL;
340
341 assert(iterator);
342
343 switch (iterator->state) {
344 case BT_NOTIFICATION_ITERATOR_STATE_FINALIZED:
345 case BT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED:
346 /* Already finalized */
347 return;
348 default:
349 break;
350 }
351
352 if (iterator->state == BT_NOTIFICATION_ITERATOR_STATE_ENDED) {
353 iterator->state = BT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED;
354 } else {
355 iterator->state = BT_NOTIFICATION_ITERATOR_STATE_FINALIZED;
356 }
357
358 assert(iterator->upstream_component);
359 comp_class = iterator->upstream_component->class;
360
361 /* Call user-defined destroy method */
362 switch (comp_class->type) {
363 case BT_COMPONENT_CLASS_TYPE_SOURCE:
364 {
365 struct bt_component_class_source *source_class;
366
367 source_class = container_of(comp_class, struct bt_component_class_source, parent);
368 finalize_method = source_class->methods.iterator.finalize;
369 break;
370 }
371 case BT_COMPONENT_CLASS_TYPE_FILTER:
372 {
373 struct bt_component_class_filter *filter_class;
374
375 filter_class = container_of(comp_class, struct bt_component_class_filter, parent);
376 finalize_method = filter_class->methods.iterator.finalize;
377 break;
378 }
379 default:
380 /* Unreachable */
381 assert(0);
382 }
383
384 if (finalize_method) {
385 finalize_method(
386 bt_private_notification_iterator_from_notification_iterator(iterator));
387 }
388
389 iterator->upstream_component = NULL;
390 iterator->upstream_port = NULL;
391 }
392
393 BT_HIDDEN
394 void bt_notification_iterator_set_connection(
395 struct bt_notification_iterator *iterator,
396 struct bt_connection *connection)
397 {
398 assert(iterator);
399 iterator->connection = connection;
400 }
401
402 static
403 int create_subscription_mask_from_notification_types(
404 struct bt_notification_iterator *iterator,
405 const enum bt_notification_type *notif_types)
406 {
407 const enum bt_notification_type *notif_type;
408 int ret = 0;
409
410 assert(notif_types);
411 iterator->subscription_mask = 0;
412
413 for (notif_type = notif_types;
414 *notif_type != BT_NOTIFICATION_TYPE_SENTINEL;
415 notif_type++) {
416 switch (*notif_type) {
417 case BT_NOTIFICATION_TYPE_ALL:
418 iterator->subscription_mask |=
419 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT |
420 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY |
421 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN |
422 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END |
423 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN |
424 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
425 break;
426 case BT_NOTIFICATION_TYPE_EVENT:
427 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT;
428 break;
429 case BT_NOTIFICATION_TYPE_INACTIVITY:
430 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY;
431 break;
432 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
433 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN;
434 break;
435 case BT_NOTIFICATION_TYPE_STREAM_END:
436 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END;
437 break;
438 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
439 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN;
440 break;
441 case BT_NOTIFICATION_TYPE_PACKET_END:
442 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
443 break;
444 default:
445 ret = -1;
446 goto end;
447 }
448 }
449
450 end:
451 return ret;
452 }
453
454 BT_HIDDEN
455 struct bt_notification_iterator *bt_notification_iterator_create(
456 struct bt_component *upstream_comp,
457 struct bt_port *upstream_port,
458 const enum bt_notification_type *notification_types,
459 struct bt_connection *connection)
460 {
461 enum bt_component_class_type type;
462 struct bt_notification_iterator *iterator = NULL;
463
464 assert(upstream_comp);
465 assert(upstream_port);
466 assert(notification_types);
467 assert(bt_port_is_connected(upstream_port));
468
469 type = bt_component_get_class_type(upstream_comp);
470 switch (type) {
471 case BT_COMPONENT_CLASS_TYPE_SOURCE:
472 case BT_COMPONENT_CLASS_TYPE_FILTER:
473 break;
474 default:
475 goto error;
476 }
477
478 iterator = g_new0(struct bt_notification_iterator, 1);
479 if (!iterator) {
480 goto error;
481 }
482
483 bt_object_init(iterator, bt_notification_iterator_destroy);
484
485 if (create_subscription_mask_from_notification_types(iterator,
486 notification_types)) {
487 goto error;
488 }
489
490 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
491 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
492 if (!iterator->stream_states) {
493 goto error;
494 }
495
496 iterator->queue = g_queue_new();
497 if (!iterator->queue) {
498 goto error;
499 }
500
501 iterator->actions = g_array_new(FALSE, FALSE, sizeof(struct action));
502 if (!iterator->actions) {
503 goto error;
504 }
505
506 iterator->upstream_component = upstream_comp;
507 iterator->upstream_port = upstream_port;
508 iterator->connection = connection;
509 iterator->state = BT_NOTIFICATION_ITERATOR_STATE_ACTIVE;
510 goto end;
511
512 error:
513 BT_PUT(iterator);
514
515 end:
516 return iterator;
517 }
518
519 BT_HIDDEN
520 enum bt_notification_iterator_status bt_notification_iterator_validate(
521 struct bt_notification_iterator *iterator)
522 {
523 enum bt_notification_iterator_status ret =
524 BT_NOTIFICATION_ITERATOR_STATUS_OK;
525
526 if (!iterator) {
527 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
528 goto end;
529 }
530 end:
531 return ret;
532 }
533
534 void *bt_private_notification_iterator_get_user_data(
535 struct bt_private_notification_iterator *private_iterator)
536 {
537 struct bt_notification_iterator *iterator =
538 bt_notification_iterator_from_private(private_iterator);
539
540 return iterator ? iterator->user_data : NULL;
541 }
542
543 enum bt_notification_iterator_status
544 bt_private_notification_iterator_set_user_data(
545 struct bt_private_notification_iterator *private_iterator,
546 void *data)
547 {
548 enum bt_notification_iterator_status ret =
549 BT_NOTIFICATION_ITERATOR_STATUS_OK;
550 struct bt_notification_iterator *iterator =
551 bt_notification_iterator_from_private(private_iterator);
552
553 if (!iterator) {
554 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
555 goto end;
556 }
557
558 iterator->user_data = data;
559 end:
560 return ret;
561 }
562
563 struct bt_notification *bt_notification_iterator_get_notification(
564 struct bt_notification_iterator *iterator)
565 {
566 struct bt_notification *notification = NULL;
567
568 if (!iterator) {
569 goto end;
570 }
571
572 notification = bt_get(iterator->current_notification);
573
574 end:
575 return notification;
576 }
577
578 static
579 enum bt_notification_iterator_notif_type
580 bt_notification_iterator_notif_type_from_notif_type(
581 enum bt_notification_type notif_type)
582 {
583 enum bt_notification_iterator_notif_type iter_notif_type;
584
585 switch (notif_type) {
586 case BT_NOTIFICATION_TYPE_EVENT:
587 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT;
588 break;
589 case BT_NOTIFICATION_TYPE_INACTIVITY:
590 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY;
591 break;
592 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
593 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN;
594 break;
595 case BT_NOTIFICATION_TYPE_STREAM_END:
596 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END;
597 break;
598 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
599 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN;
600 break;
601 case BT_NOTIFICATION_TYPE_PACKET_END:
602 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
603 break;
604 default:
605 assert(BT_FALSE);
606 }
607
608 return iter_notif_type;
609 }
610
611 static
612 bt_bool validate_notification(struct bt_notification_iterator *iterator,
613 struct bt_notification *notif,
614 struct bt_ctf_stream *notif_stream,
615 struct bt_ctf_packet *notif_packet)
616 {
617 bt_bool is_valid = BT_TRUE;
618 struct stream_state *stream_state;
619 struct bt_port *stream_comp_cur_port;
620
621 assert(notif_stream);
622 stream_comp_cur_port =
623 bt_ctf_stream_port_for_component(notif_stream,
624 iterator->upstream_component);
625 if (!stream_comp_cur_port) {
626 /*
627 * This is the first time this notification iterator
628 * bumps into this stream. Add an action to map the
629 * iterator's upstream component to the iterator's
630 * upstream port in this stream.
631 */
632 struct action action = {
633 .type = ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
634 .payload.map_port_to_comp_in_stream = {
635 .stream = bt_get(notif_stream),
636 .component = bt_get(iterator->upstream_component),
637 .port = bt_get(iterator->upstream_port),
638 },
639 };
640
641 add_action(iterator, &action);
642 } else {
643 if (stream_comp_cur_port != iterator->upstream_port) {
644 /*
645 * It looks like two different ports of the same
646 * component are emitting notifications which
647 * have references to the same stream. This is
648 * bad: the API guarantees that it can never
649 * happen.
650 */
651 is_valid = BT_FALSE;
652 goto end;
653 }
654
655 }
656
657 stream_state = g_hash_table_lookup(iterator->stream_states,
658 notif_stream);
659 if (stream_state) {
660 if (stream_state->is_ended) {
661 /*
662 * There's a new notification which has a
663 * reference to a stream which, from this
664 * iterator's point of view, is ended ("end of
665 * stream" notification was returned). This is
666 * bad: the API guarantees that it can never
667 * happen.
668 */
669 is_valid = BT_FALSE;
670 goto end;
671 }
672
673 switch (notif->type) {
674 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
675 /*
676 * We already have a stream state, which means
677 * we already returned a "stream begin"
678 * notification: this is an invalid duplicate.
679 */
680 is_valid = BT_FALSE;
681 goto end;
682 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
683 if (notif_packet == stream_state->cur_packet) {
684 /* Duplicate "packet begin" notification */
685 is_valid = BT_FALSE;
686 goto end;
687 }
688 break;
689 default:
690 break;
691 }
692 }
693
694 end:
695 return is_valid;
696 }
697
698 static
699 bt_bool is_subscribed_to_notification_type(struct bt_notification_iterator *iterator,
700 enum bt_notification_type notif_type)
701 {
702 uint32_t iter_notif_type =
703 (uint32_t) bt_notification_iterator_notif_type_from_notif_type(
704 notif_type);
705
706 return (iter_notif_type & iterator->subscription_mask) ? BT_TRUE : BT_FALSE;
707 }
708
709 static
710 void add_action_push_notif(struct bt_notification_iterator *iterator,
711 struct bt_notification *notif)
712 {
713 struct action action = {
714 .type = ACTION_TYPE_PUSH_NOTIF,
715 };
716
717 assert(notif);
718
719 if (!is_subscribed_to_notification_type(iterator, notif->type)) {
720 return;
721 }
722
723 action.payload.push_notif.notif = bt_get(notif);
724 add_action(iterator, &action);
725 }
726
727 static
728 int add_action_push_notif_stream_begin(
729 struct bt_notification_iterator *iterator,
730 struct bt_ctf_stream *stream)
731 {
732 int ret = 0;
733 struct bt_notification *stream_begin_notif = NULL;
734
735 if (!is_subscribed_to_notification_type(iterator,
736 BT_NOTIFICATION_TYPE_STREAM_BEGIN)) {
737 goto end;
738 }
739
740 assert(stream);
741 stream_begin_notif = bt_notification_stream_begin_create(stream);
742 if (!stream_begin_notif) {
743 goto error;
744 }
745
746 add_action_push_notif(iterator, stream_begin_notif);
747 goto end;
748
749 error:
750 ret = -1;
751
752 end:
753 bt_put(stream_begin_notif);
754 return ret;
755 }
756
757 static
758 int add_action_push_notif_stream_end(
759 struct bt_notification_iterator *iterator,
760 struct bt_ctf_stream *stream)
761 {
762 int ret = 0;
763 struct bt_notification *stream_end_notif = NULL;
764
765 if (!is_subscribed_to_notification_type(iterator,
766 BT_NOTIFICATION_TYPE_STREAM_END)) {
767 goto end;
768 }
769
770 assert(stream);
771 stream_end_notif = bt_notification_stream_end_create(stream);
772 if (!stream_end_notif) {
773 goto error;
774 }
775
776 add_action_push_notif(iterator, stream_end_notif);
777 goto end;
778
779 error:
780 ret = -1;
781
782 end:
783 bt_put(stream_end_notif);
784 return ret;
785 }
786
787 static
788 int add_action_push_notif_packet_begin(
789 struct bt_notification_iterator *iterator,
790 struct bt_ctf_packet *packet)
791 {
792 int ret = 0;
793 struct bt_notification *packet_begin_notif = NULL;
794
795 if (!is_subscribed_to_notification_type(iterator,
796 BT_NOTIFICATION_TYPE_PACKET_BEGIN)) {
797 goto end;
798 }
799
800 assert(packet);
801 packet_begin_notif = bt_notification_packet_begin_create(packet);
802 if (!packet_begin_notif) {
803 goto error;
804 }
805
806 add_action_push_notif(iterator, packet_begin_notif);
807 goto end;
808
809 error:
810 ret = -1;
811
812 end:
813 bt_put(packet_begin_notif);
814 return ret;
815 }
816
817 static
818 int add_action_push_notif_packet_end(
819 struct bt_notification_iterator *iterator,
820 struct bt_ctf_packet *packet)
821 {
822 int ret = 0;
823 struct bt_notification *packet_end_notif = NULL;
824
825 if (!is_subscribed_to_notification_type(iterator,
826 BT_NOTIFICATION_TYPE_PACKET_END)) {
827 goto end;
828 }
829
830 assert(packet);
831 packet_end_notif = bt_notification_packet_end_create(packet);
832 if (!packet_end_notif) {
833 goto error;
834 }
835
836 add_action_push_notif(iterator, packet_end_notif);
837 goto end;
838
839 error:
840 ret = -1;
841
842 end:
843 bt_put(packet_end_notif);
844 return ret;
845 }
846
847 static
848 void add_action_set_stream_state_is_ended(
849 struct bt_notification_iterator *iterator,
850 struct stream_state *stream_state)
851 {
852 struct action action = {
853 .type = ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
854 .payload.set_stream_state_is_ended = {
855 .stream_state = stream_state,
856 },
857 };
858
859 assert(stream_state);
860 add_action(iterator, &action);
861 }
862
863 static
864 void add_action_set_stream_state_cur_packet(
865 struct bt_notification_iterator *iterator,
866 struct stream_state *stream_state,
867 struct bt_ctf_packet *packet)
868 {
869 struct action action = {
870 .type = ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
871 .payload.set_stream_state_cur_packet = {
872 .stream_state = stream_state,
873 .packet = bt_get(packet),
874 },
875 };
876
877 assert(stream_state);
878 add_action(iterator, &action);
879 }
880
881 static
882 int ensure_stream_state_exists(struct bt_notification_iterator *iterator,
883 struct bt_notification *stream_begin_notif,
884 struct bt_ctf_stream *notif_stream,
885 struct stream_state **stream_state)
886 {
887 int ret = 0;
888
889 if (!notif_stream) {
890 /*
891 * The notification does not reference any stream: no
892 * need to get or create a stream state.
893 */
894 goto end;
895 }
896
897 *stream_state = g_hash_table_lookup(iterator->stream_states,
898 notif_stream);
899 if (!*stream_state) {
900 /*
901 * This iterator did not bump into this stream yet:
902 * create a stream state and a "stream begin"
903 * notification.
904 */
905 struct action action = {
906 .type = ACTION_TYPE_ADD_STREAM_STATE,
907 .payload.add_stream_state = {
908 .stream = bt_get(notif_stream),
909 .stream_state = NULL,
910 },
911 };
912
913 *stream_state = create_stream_state(notif_stream);
914 if (!stream_state) {
915 goto error;
916 }
917
918 action.payload.add_stream_state.stream_state =
919 *stream_state;
920 add_action(iterator, &action);
921
922 if (stream_begin_notif) {
923 add_action_push_notif(iterator, stream_begin_notif);
924 } else {
925 ret = add_action_push_notif_stream_begin(iterator,
926 notif_stream);
927 if (ret) {
928 goto error;
929 }
930 }
931 }
932
933 goto end;
934
935 error:
936 destroy_stream_state(*stream_state);
937 ret = -1;
938
939 end:
940 return ret;
941 }
942
943 static
944 int handle_packet_switch(struct bt_notification_iterator *iterator,
945 struct bt_notification *packet_begin_notif,
946 struct bt_ctf_packet *new_packet,
947 struct stream_state *stream_state)
948 {
949 int ret = 0;
950
951 if (stream_state->cur_packet == new_packet) {
952 goto end;
953 }
954
955 if (stream_state->cur_packet) {
956 /* End of the current packet */
957 ret = add_action_push_notif_packet_end(iterator,
958 stream_state->cur_packet);
959 if (ret) {
960 goto error;
961 }
962 }
963
964 /* Beginning of the new packet */
965 if (packet_begin_notif) {
966 add_action_push_notif(iterator, packet_begin_notif);
967 } else if (new_packet) {
968 ret = add_action_push_notif_packet_begin(iterator,
969 new_packet);
970 if (ret) {
971 goto error;
972 }
973 }
974
975 add_action_set_stream_state_cur_packet(iterator, stream_state,
976 new_packet);
977 goto end;
978
979 error:
980 ret = -1;
981
982 end:
983 return ret;
984 }
985
986 static
987 int handle_notif_stream_begin(
988 struct bt_notification_iterator *iterator,
989 struct bt_notification *notif,
990 struct bt_ctf_stream *notif_stream)
991 {
992 int ret = 0;
993 struct stream_state *stream_state;
994
995 assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_BEGIN);
996 assert(notif_stream);
997 ret = ensure_stream_state_exists(iterator, notif, notif_stream,
998 &stream_state);
999 if (ret) {
1000 goto error;
1001 }
1002
1003 goto end;
1004
1005 error:
1006 ret = -1;
1007
1008 end:
1009 return ret;
1010 }
1011
1012 static
1013 int handle_notif_stream_end(
1014 struct bt_notification_iterator *iterator,
1015 struct bt_notification *notif,
1016 struct bt_ctf_stream *notif_stream)
1017 {
1018 int ret = 0;
1019 struct stream_state *stream_state;
1020
1021 assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_END);
1022 assert(notif_stream);
1023 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1024 &stream_state);
1025 if (ret) {
1026 goto error;
1027 }
1028
1029 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
1030 if (ret) {
1031 goto error;
1032 }
1033
1034 add_action_push_notif(iterator, notif);
1035 add_action_set_stream_state_is_ended(iterator, stream_state);
1036 goto end;
1037
1038 error:
1039 ret = -1;
1040
1041 end:
1042 return ret;
1043 }
1044
1045 static
1046 int handle_notif_packet_begin(
1047 struct bt_notification_iterator *iterator,
1048 struct bt_notification *notif,
1049 struct bt_ctf_stream *notif_stream,
1050 struct bt_ctf_packet *notif_packet)
1051 {
1052 int ret = 0;
1053 struct stream_state *stream_state;
1054
1055 assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_BEGIN);
1056 assert(notif_packet);
1057 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1058 &stream_state);
1059 if (ret) {
1060 goto error;
1061 }
1062
1063 ret = handle_packet_switch(iterator, notif, notif_packet, stream_state);
1064 if (ret) {
1065 goto error;
1066 }
1067
1068 goto end;
1069
1070 error:
1071 ret = -1;
1072
1073 end:
1074 return ret;
1075 }
1076
1077 static
1078 int handle_notif_packet_end(
1079 struct bt_notification_iterator *iterator,
1080 struct bt_notification *notif,
1081 struct bt_ctf_stream *notif_stream,
1082 struct bt_ctf_packet *notif_packet)
1083 {
1084 int ret = 0;
1085 struct stream_state *stream_state;
1086
1087 assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_END);
1088 assert(notif_packet);
1089 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1090 &stream_state);
1091 if (ret) {
1092 goto error;
1093 }
1094
1095 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
1096 if (ret) {
1097 goto error;
1098 }
1099
1100 /* End of the current packet */
1101 add_action_push_notif(iterator, notif);
1102 add_action_set_stream_state_cur_packet(iterator, stream_state, NULL);
1103 goto end;
1104
1105 error:
1106 ret = -1;
1107
1108 end:
1109 return ret;
1110 }
1111
1112 static
1113 int handle_notif_event(
1114 struct bt_notification_iterator *iterator,
1115 struct bt_notification *notif,
1116 struct bt_ctf_stream *notif_stream,
1117 struct bt_ctf_packet *notif_packet)
1118 {
1119 int ret = 0;
1120 struct stream_state *stream_state;
1121
1122 assert(notif->type == BT_NOTIFICATION_TYPE_EVENT);
1123 assert(notif_packet);
1124 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1125 &stream_state);
1126 if (ret) {
1127 goto error;
1128 }
1129
1130 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
1131 if (ret) {
1132 goto error;
1133 }
1134
1135 add_action_push_notif(iterator, notif);
1136 goto end;
1137
1138 error:
1139 ret = -1;
1140
1141 end:
1142 return ret;
1143 }
1144
1145 static
1146 int enqueue_notification_and_automatic(
1147 struct bt_notification_iterator *iterator,
1148 struct bt_notification *notif)
1149 {
1150 int ret = 0;
1151 struct bt_ctf_event *notif_event = NULL;
1152 struct bt_ctf_stream *notif_stream = NULL;
1153 struct bt_ctf_packet *notif_packet = NULL;
1154
1155 assert(notif);
1156
1157 // TODO: Skip most of this if the iterator is only subscribed
1158 // to event/inactivity notifications.
1159
1160 /* Get the stream and packet referred by the notification */
1161 switch (notif->type) {
1162 case BT_NOTIFICATION_TYPE_EVENT:
1163 notif_event = bt_notification_event_borrow_event(notif);
1164 assert(notif_event);
1165 notif_packet = bt_ctf_event_borrow_packet(notif_event);
1166 assert(notif_packet);
1167 break;
1168 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1169 notif_stream =
1170 bt_notification_stream_begin_borrow_stream(notif);
1171 assert(notif_stream);
1172 break;
1173 case BT_NOTIFICATION_TYPE_STREAM_END:
1174 notif_stream = bt_notification_stream_end_borrow_stream(notif);
1175 assert(notif_stream);
1176 break;
1177 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1178 notif_packet =
1179 bt_notification_packet_begin_borrow_packet(notif);
1180 assert(notif_packet);
1181 break;
1182 case BT_NOTIFICATION_TYPE_PACKET_END:
1183 notif_packet = bt_notification_packet_end_borrow_packet(notif);
1184 assert(notif_packet);
1185 break;
1186 case BT_NOTIFICATION_TYPE_INACTIVITY:
1187 /* Always valid */
1188 goto handle_notif;
1189 default:
1190 /*
1191 * Invalid type of notification. Only the notification
1192 * types above are allowed to be returned by a user
1193 * component.
1194 */
1195 goto error;
1196 }
1197
1198 if (notif_packet) {
1199 notif_stream = bt_ctf_packet_borrow_stream(notif_packet);
1200 assert(notif_stream);
1201 }
1202
1203 if (!notif_stream) {
1204 /*
1205 * The notification has no reference to a stream: it
1206 * cannot cause the creation of automatic notifications.
1207 */
1208 goto end;
1209 }
1210
1211 if (!validate_notification(iterator, notif, notif_stream,
1212 notif_packet)) {
1213 goto error;
1214 }
1215
1216 handle_notif:
1217 switch (notif->type) {
1218 case BT_NOTIFICATION_TYPE_EVENT:
1219 ret = handle_notif_event(iterator, notif, notif_stream,
1220 notif_packet);
1221 break;
1222 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1223 ret = handle_notif_stream_begin(iterator, notif, notif_stream);
1224 break;
1225 case BT_NOTIFICATION_TYPE_STREAM_END:
1226 ret = handle_notif_stream_end(iterator, notif, notif_stream);
1227 break;
1228 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1229 ret = handle_notif_packet_begin(iterator, notif, notif_stream,
1230 notif_packet);
1231 break;
1232 case BT_NOTIFICATION_TYPE_PACKET_END:
1233 ret = handle_notif_packet_end(iterator, notif, notif_stream,
1234 notif_packet);
1235 break;
1236 case BT_NOTIFICATION_TYPE_INACTIVITY:
1237 add_action_push_notif(iterator, notif);
1238 break;
1239 default:
1240 break;
1241 }
1242
1243 if (ret) {
1244 goto error;
1245 }
1246
1247 apply_actions(iterator);
1248 goto end;
1249
1250 error:
1251 ret = -1;
1252
1253 end:
1254 return ret;
1255 }
1256
1257 static
1258 int handle_end(struct bt_notification_iterator *iterator)
1259 {
1260 GHashTableIter stream_state_iter;
1261 gpointer stream_gptr, stream_state_gptr;
1262 int ret = 0;
1263
1264 /*
1265 * Emit a "stream end" notification for each non-ended stream
1266 * known by this iterator and mark them as ended.
1267 */
1268 g_hash_table_iter_init(&stream_state_iter, iterator->stream_states);
1269
1270 while (g_hash_table_iter_next(&stream_state_iter, &stream_gptr,
1271 &stream_state_gptr)) {
1272 struct stream_state *stream_state = stream_state_gptr;
1273
1274 assert(stream_state_gptr);
1275
1276 if (stream_state->is_ended) {
1277 continue;
1278 }
1279
1280 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
1281 if (ret) {
1282 goto error;
1283 }
1284
1285 ret = add_action_push_notif_stream_end(iterator, stream_gptr);
1286 if (ret) {
1287 goto error;
1288 }
1289
1290 add_action_set_stream_state_is_ended(iterator, stream_state);
1291 }
1292
1293 apply_actions(iterator);
1294 goto end;
1295
1296 error:
1297 ret = -1;
1298
1299 end:
1300 return ret;
1301 }
1302
1303 static
1304 enum bt_notification_iterator_status ensure_queue_has_notifications(
1305 struct bt_notification_iterator *iterator)
1306 {
1307 struct bt_private_notification_iterator *priv_iterator =
1308 bt_private_notification_iterator_from_notification_iterator(iterator);
1309 bt_component_class_notification_iterator_next_method next_method = NULL;
1310 struct bt_notification_iterator_next_return next_return = {
1311 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
1312 .notification = NULL,
1313 };
1314 enum bt_notification_iterator_status status =
1315 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1316 int ret;
1317
1318 assert(iterator);
1319
1320 if (iterator->queue->length > 0) {
1321 /* We already have enough */
1322 goto end;
1323 }
1324
1325 switch (iterator->state) {
1326 case BT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED:
1327 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
1328 goto end;
1329 case BT_NOTIFICATION_ITERATOR_STATE_ENDED:
1330 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
1331 goto end;
1332 default:
1333 break;
1334 }
1335
1336 assert(iterator->upstream_component);
1337 assert(iterator->upstream_component->class);
1338
1339 /* Pick the appropriate "next" method */
1340 switch (iterator->upstream_component->class->type) {
1341 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1342 {
1343 struct bt_component_class_source *source_class =
1344 container_of(iterator->upstream_component->class,
1345 struct bt_component_class_source, parent);
1346
1347 assert(source_class->methods.iterator.next);
1348 next_method = source_class->methods.iterator.next;
1349 break;
1350 }
1351 case BT_COMPONENT_CLASS_TYPE_FILTER:
1352 {
1353 struct bt_component_class_filter *filter_class =
1354 container_of(iterator->upstream_component->class,
1355 struct bt_component_class_filter, parent);
1356
1357 assert(filter_class->methods.iterator.next);
1358 next_method = filter_class->methods.iterator.next;
1359 break;
1360 }
1361 default:
1362 assert(BT_FALSE);
1363 break;
1364 }
1365
1366 /*
1367 * Call the user's "next" method to get the next notification
1368 * and status.
1369 */
1370 assert(next_method);
1371
1372 while (iterator->queue->length == 0) {
1373 next_return = next_method(priv_iterator);
1374 if (next_return.status < 0) {
1375 status = next_return.status;
1376 goto end;
1377 }
1378
1379 switch (next_return.status) {
1380 case BT_NOTIFICATION_ITERATOR_STATUS_END:
1381 ret = handle_end(iterator);
1382 if (ret) {
1383 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1384 goto end;
1385 }
1386
1387 if (iterator->state == BT_NOTIFICATION_ITERATOR_STATE_FINALIZED) {
1388 iterator->state =
1389 BT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED;
1390
1391 if (iterator->queue->length == 0) {
1392 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
1393 }
1394 } else {
1395 iterator->state =
1396 BT_NOTIFICATION_ITERATOR_STATE_ENDED;
1397
1398 if (iterator->queue->length == 0) {
1399 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
1400 }
1401 }
1402 goto end;
1403 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
1404 status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
1405 goto end;
1406 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
1407 if (!next_return.notification) {
1408 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1409 goto end;
1410 }
1411
1412 /*
1413 * We know the notification is valid. Before we
1414 * push it to the head of the queue, push the
1415 * appropriate automatic notifications if any.
1416 */
1417 ret = enqueue_notification_and_automatic(iterator,
1418 next_return.notification);
1419 BT_PUT(next_return.notification);
1420 if (ret) {
1421 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1422 goto end;
1423 }
1424 break;
1425 default:
1426 /* Unknown non-error status */
1427 assert(BT_FALSE);
1428 }
1429 }
1430
1431 end:
1432 return status;
1433 }
1434
1435 enum bt_notification_iterator_status
1436 bt_notification_iterator_next(struct bt_notification_iterator *iterator)
1437 {
1438 enum bt_notification_iterator_status status;
1439
1440 if (!iterator) {
1441 status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
1442 goto end;
1443 }
1444
1445 /*
1446 * Make sure that the iterator's queue contains at least one
1447 * notification.
1448 */
1449 status = ensure_queue_has_notifications(iterator);
1450 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1451 goto end;
1452 }
1453
1454 /*
1455 * Move the notification at the tail of the queue to the
1456 * iterator's current notification.
1457 */
1458 assert(iterator->queue->length > 0);
1459 bt_put(iterator->current_notification);
1460 iterator->current_notification = g_queue_pop_tail(iterator->queue);
1461 assert(iterator->current_notification);
1462
1463 end:
1464 return status;
1465 }
1466
1467 struct bt_component *bt_notification_iterator_get_component(
1468 struct bt_notification_iterator *iterator)
1469 {
1470 return bt_get(iterator->upstream_component);
1471 }
1472
1473 struct bt_private_component *
1474 bt_private_notification_iterator_get_private_component(
1475 struct bt_private_notification_iterator *private_iterator)
1476 {
1477 return bt_private_component_from_component(
1478 bt_notification_iterator_get_component(
1479 bt_notification_iterator_from_private(private_iterator)));
1480 }
1481
1482 enum bt_notification_iterator_status bt_notification_iterator_seek_time(
1483 struct bt_notification_iterator *iterator,
1484 enum bt_notification_iterator_seek_origin seek_origin,
1485 int64_t time)
1486 {
1487 enum bt_notification_iterator_status ret =
1488 BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
1489 return ret;
1490 }
This page took 0.092133 seconds and 4 git commands to generate.