Notification iterator: generate automatic notifications when missing
[babeltrace.git] / lib / graph / iterator.c
1 /*
2 * iterator.c
3 *
4 * Babeltrace Notification Iterator
5 *
6 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining a copy
10 * of this software and associated documentation files (the "Software"), to deal
11 * in the Software without restriction, including without limitation the rights
12 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 * copies of the Software, and to permit persons to whom the Software is
14 * furnished to do so, subject to the following conditions:
15 *
16 * The above copyright notice and this permission notice shall be included in
17 * all copies or substantial portions of the Software.
18 *
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 * SOFTWARE.
26 */
27
28 #include <babeltrace/compiler-internal.h>
29 #include <babeltrace/ref.h>
30 #include <babeltrace/ctf-ir/event-internal.h>
31 #include <babeltrace/ctf-ir/packet-internal.h>
32 #include <babeltrace/ctf-ir/stream-internal.h>
33 #include <babeltrace/graph/component.h>
34 #include <babeltrace/graph/component-source-internal.h>
35 #include <babeltrace/graph/component-class-internal.h>
36 #include <babeltrace/graph/notification-iterator.h>
37 #include <babeltrace/graph/notification-iterator-internal.h>
38 #include <babeltrace/graph/notification-internal.h>
39 #include <babeltrace/graph/notification-event.h>
40 #include <babeltrace/graph/notification-event-internal.h>
41 #include <babeltrace/graph/notification-packet.h>
42 #include <babeltrace/graph/notification-packet-internal.h>
43 #include <babeltrace/graph/notification-stream.h>
44 #include <babeltrace/graph/notification-stream-internal.h>
45 #include <babeltrace/graph/port.h>
46
47 struct stream_state {
48 struct bt_ctf_stream *stream; /* owned by this */
49 struct bt_ctf_packet *cur_packet; /* owned by this */
50 bool is_ended;
51 };
52
53 enum action_type {
54 ACTION_TYPE_PUSH_NOTIF,
55 ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
56 ACTION_TYPE_ADD_STREAM_STATE,
57 ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
58 ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
59 };
60
61 struct action {
62 enum action_type type;
63 union {
64 /* ACTION_TYPE_PUSH_NOTIF */
65 struct {
66 struct bt_notification *notif; /* owned by this */
67 } push_notif;
68
69 /* ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM */
70 struct {
71 struct bt_ctf_stream *stream; /* owned by this */
72 struct bt_component *component; /* owned by this */
73 struct bt_port *port; /* owned by this */
74 } map_port_to_comp_in_stream;
75
76 /* ACTION_TYPE_ADD_STREAM_STATE */
77 struct {
78 struct bt_ctf_stream *stream; /* owned by this */
79 struct stream_state *stream_state; /* owned by this */
80 } add_stream_state;
81
82 /* ACTION_TYPE_SET_STREAM_STATE_IS_ENDED */
83 struct {
84 struct stream_state *stream_state; /* weak */
85 } set_stream_state_is_ended;
86
87 /* ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET */
88 struct {
89 struct stream_state *stream_state; /* weak */
90 struct bt_ctf_packet *packet; /* owned by this */
91 } set_stream_state_cur_packet;
92 } payload;
93 };
94
95 static
96 void stream_destroy_listener(struct bt_ctf_stream *stream, void *data)
97 {
98 struct bt_notification_iterator *iterator = data;
99
100 /* Remove associated stream state */
101 g_hash_table_remove(iterator->stream_states, stream);
102 }
103
104 static
105 void destroy_stream_state(struct stream_state *stream_state)
106 {
107 if (!stream_state) {
108 return;
109 }
110
111 bt_put(stream_state->cur_packet);
112 bt_put(stream_state->stream);
113 g_free(stream_state);
114 }
115
116 static
117 void destroy_action(struct action *action)
118 {
119 assert(action);
120
121 switch (action->type) {
122 case ACTION_TYPE_PUSH_NOTIF:
123 BT_PUT(action->payload.push_notif.notif);
124 break;
125 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
126 BT_PUT(action->payload.map_port_to_comp_in_stream.stream);
127 BT_PUT(action->payload.map_port_to_comp_in_stream.component);
128 BT_PUT(action->payload.map_port_to_comp_in_stream.port);
129 break;
130 case ACTION_TYPE_ADD_STREAM_STATE:
131 BT_PUT(action->payload.add_stream_state.stream);
132 destroy_stream_state(
133 action->payload.add_stream_state.stream_state);
134 action->payload.add_stream_state.stream_state = NULL;
135 break;
136 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
137 BT_PUT(action->payload.set_stream_state_cur_packet.packet);
138 break;
139 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
140 break;
141 default:
142 assert(false);
143 }
144 }
145
146 static
147 void add_action(struct bt_notification_iterator *iterator,
148 struct action *action)
149 {
150 g_array_append_val(iterator->actions, *action);
151 }
152
153 static
154 void clear_actions(struct bt_notification_iterator *iterator)
155 {
156 size_t i;
157
158 for (i = 0; i < iterator->actions->len; i++) {
159 struct action *action = &g_array_index(iterator->actions,
160 struct action, i);
161
162 destroy_action(action);
163 }
164
165 g_array_set_size(iterator->actions, 0);
166 }
167
168 static
169 void apply_actions(struct bt_notification_iterator *iterator)
170 {
171 size_t i;
172
173 for (i = 0; i < iterator->actions->len; i++) {
174 struct action *action = &g_array_index(iterator->actions,
175 struct action, i);
176
177 switch (action->type) {
178 case ACTION_TYPE_PUSH_NOTIF:
179 /* Move notification to queue */
180 g_queue_push_head(iterator->queue,
181 action->payload.push_notif.notif);
182 bt_notification_freeze(
183 action->payload.push_notif.notif);
184 action->payload.push_notif.notif = NULL;
185 break;
186 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
187 bt_ctf_stream_map_component_to_port(
188 action->payload.map_port_to_comp_in_stream.stream,
189 action->payload.map_port_to_comp_in_stream.component,
190 action->payload.map_port_to_comp_in_stream.port);
191 break;
192 case ACTION_TYPE_ADD_STREAM_STATE:
193 /* Move stream state to hash table */
194 g_hash_table_insert(iterator->stream_states,
195 action->payload.add_stream_state.stream,
196 action->payload.add_stream_state.stream_state);
197
198 action->payload.add_stream_state.stream_state = NULL;
199 break;
200 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
201 /*
202 * We know that this stream is ended. We need to
203 * remember this as long as the stream exists to
204 * enforce that the same stream does not end
205 * twice.
206 *
207 * Here we add a destroy listener to the stream
208 * which we put after (becomes weak as the hash
209 * table key). If we were the last object to own
210 * this stream, the destroy listener is called
211 * when we call bt_put() which removes this
212 * stream state completely. This is important
213 * because the memory used by this stream object
214 * could be reused for another stream, and they
215 * must have different states.
216 */
217 bt_ctf_stream_add_destroy_listener(
218 action->payload.set_stream_state_is_ended.stream_state->stream,
219 stream_destroy_listener, iterator);
220 action->payload.set_stream_state_is_ended.stream_state->is_ended = true;
221 BT_PUT(action->payload.set_stream_state_is_ended.stream_state->stream);
222 break;
223 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
224 /* Move packet to stream state's current packet */
225 BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet,
226 action->payload.set_stream_state_cur_packet.packet);
227 break;
228 default:
229 assert(false);
230 }
231 }
232
233 clear_actions(iterator);
234 }
235
236 static
237 struct stream_state *create_stream_state(struct bt_ctf_stream *stream)
238 {
239 struct stream_state *stream_state = g_new0(struct stream_state, 1);
240
241 if (!stream_state) {
242 goto end;
243 }
244
245 /*
246 * We keep a reference to the stream until we know it's ended
247 * because we need to be able to create an automatic "stream
248 * end" notification when the user's "next" method returns
249 * BT_NOTIFICATION_ITERATOR_STATUS_END.
250 *
251 * We put this reference when the stream is marked as ended.
252 */
253 stream_state->stream = bt_get(stream);
254
255 end:
256 return stream_state;
257 }
258
259 static
260 void bt_notification_iterator_destroy(struct bt_object *obj)
261 {
262 struct bt_notification_iterator *iterator;
263 struct bt_component_class *comp_class;
264
265 assert(obj);
266 iterator = container_of(obj, struct bt_notification_iterator,
267 base);
268 assert(iterator->upstream_component);
269 comp_class = iterator->upstream_component->class;
270
271 /* Call user-defined destroy method */
272 switch (comp_class->type) {
273 case BT_COMPONENT_CLASS_TYPE_SOURCE:
274 {
275 struct bt_component_class_source *source_class;
276
277 source_class = container_of(comp_class, struct bt_component_class_source, parent);
278
279 if (source_class->methods.iterator.finalize) {
280 source_class->methods.iterator.finalize(
281 bt_private_notification_iterator_from_notification_iterator(iterator));
282 }
283 break;
284 }
285 case BT_COMPONENT_CLASS_TYPE_FILTER:
286 {
287 struct bt_component_class_filter *filter_class;
288
289 filter_class = container_of(comp_class, struct bt_component_class_filter, parent);
290
291 if (filter_class->methods.iterator.finalize) {
292 filter_class->methods.iterator.finalize(
293 bt_private_notification_iterator_from_notification_iterator(iterator));
294 }
295 break;
296 }
297 default:
298 /* Unreachable */
299 assert(0);
300 }
301
302 if (iterator->queue) {
303 struct bt_notification *notif;
304
305 while ((notif = g_queue_pop_tail(iterator->queue))) {
306 bt_put(notif);
307 }
308
309 g_queue_free(iterator->queue);
310 }
311
312 if (iterator->stream_states) {
313 /*
314 * Remove our destroy listener from each stream which
315 * has a state in this iterator. Otherwise the destroy
316 * listener would be called with an invalid/other
317 * notification iterator object.
318 */
319 GHashTableIter ht_iter;
320 gpointer stream_gptr, stream_state_gptr;
321
322 g_hash_table_iter_init(&ht_iter, iterator->stream_states);
323
324 while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) {
325 assert(stream_gptr);
326 bt_ctf_stream_remove_destroy_listener(
327 (void *) stream_gptr, stream_destroy_listener,
328 iterator);
329 }
330
331 g_hash_table_destroy(iterator->stream_states);
332 }
333
334 if (iterator->actions) {
335 g_array_free(iterator->actions, TRUE);
336 }
337
338 bt_put(iterator->current_notification);
339 bt_put(iterator->upstream_component);
340 bt_put(iterator->upstream_port);
341 g_free(iterator);
342 }
343
344 BT_HIDDEN
345 struct bt_notification_iterator *bt_notification_iterator_create(
346 struct bt_component *upstream_comp,
347 struct bt_port *upstream_port)
348 {
349 enum bt_component_class_type type;
350 struct bt_notification_iterator *iterator = NULL;
351
352 assert(upstream_comp);
353 assert(upstream_port);
354 assert(bt_port_is_connected(upstream_port));
355
356 type = bt_component_get_class_type(upstream_comp);
357 switch (type) {
358 case BT_COMPONENT_CLASS_TYPE_SOURCE:
359 case BT_COMPONENT_CLASS_TYPE_FILTER:
360 break;
361 default:
362 goto error;
363 }
364
365 iterator = g_new0(struct bt_notification_iterator, 1);
366 if (!iterator) {
367 goto error;
368 }
369
370 bt_object_init(iterator, bt_notification_iterator_destroy);
371
372 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
373 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
374 if (!iterator->stream_states) {
375 goto error;
376 }
377
378 iterator->queue = g_queue_new();
379 if (!iterator->queue) {
380 goto error;
381 }
382
383 iterator->actions = g_array_new(FALSE, FALSE, sizeof(struct action));
384 if (!iterator->actions) {
385 goto error;
386 }
387
388 iterator->upstream_component = bt_get(upstream_comp);
389 iterator->upstream_port = bt_get(upstream_port);
390 goto end;
391
392 error:
393 BT_PUT(iterator);
394
395 end:
396 return iterator;
397 }
398
399 BT_HIDDEN
400 enum bt_notification_iterator_status bt_notification_iterator_validate(
401 struct bt_notification_iterator *iterator)
402 {
403 enum bt_notification_iterator_status ret =
404 BT_NOTIFICATION_ITERATOR_STATUS_OK;
405
406 if (!iterator) {
407 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
408 goto end;
409 }
410 end:
411 return ret;
412 }
413
414 void *bt_private_notification_iterator_get_user_data(
415 struct bt_private_notification_iterator *private_iterator)
416 {
417 struct bt_notification_iterator *iterator =
418 bt_notification_iterator_from_private(private_iterator);
419
420 return iterator ? iterator->user_data : NULL;
421 }
422
423 enum bt_notification_iterator_status
424 bt_private_notification_iterator_set_user_data(
425 struct bt_private_notification_iterator *private_iterator,
426 void *data)
427 {
428 enum bt_notification_iterator_status ret =
429 BT_NOTIFICATION_ITERATOR_STATUS_OK;
430 struct bt_notification_iterator *iterator =
431 bt_notification_iterator_from_private(private_iterator);
432
433 if (!iterator) {
434 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
435 goto end;
436 }
437
438 iterator->user_data = data;
439 end:
440 return ret;
441 }
442
443 struct bt_notification *bt_notification_iterator_get_notification(
444 struct bt_notification_iterator *iterator)
445 {
446 struct bt_notification *notification = NULL;
447
448 if (!iterator) {
449 goto end;
450 }
451
452 notification = bt_get(iterator->current_notification);
453
454 end:
455 return notification;
456 }
457
458 static
459 bool validate_notification(struct bt_notification_iterator *iterator,
460 struct bt_notification *notif,
461 struct bt_ctf_stream *notif_stream,
462 struct bt_ctf_packet *notif_packet)
463 {
464 bool is_valid = true;
465 struct stream_state *stream_state;
466 struct bt_port *stream_comp_cur_port;
467
468 assert(notif_stream);
469 stream_comp_cur_port =
470 bt_ctf_stream_port_for_component(notif_stream,
471 iterator->upstream_component);
472 if (!stream_comp_cur_port) {
473 /*
474 * This is the first time this notification iterator
475 * bumps into this stream. Add an action to map the
476 * iterator's upstream component to the iterator's
477 * upstream port in this stream.
478 */
479 struct action action = {
480 .type = ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
481 .payload.map_port_to_comp_in_stream = {
482 .stream = bt_get(notif_stream),
483 .component = bt_get(iterator->upstream_component),
484 .port = bt_get(iterator->upstream_port),
485 },
486 };
487
488 add_action(iterator, &action);
489 } else {
490 if (stream_comp_cur_port != iterator->upstream_port) {
491 /*
492 * It looks like two different ports of the same
493 * component are emitting notifications which
494 * have references to the same stream. This is
495 * bad: the API guarantees that it can never
496 * happen.
497 */
498 is_valid = false;
499 goto end;
500 }
501
502 }
503
504 stream_state = g_hash_table_lookup(iterator->stream_states,
505 notif_stream);
506 if (stream_state) {
507 if (stream_state->is_ended) {
508 /*
509 * There's a new notification which has a
510 * reference to a stream which, from this
511 * iterator's point of view, is ended ("end of
512 * stream" notification was returned). This is
513 * bad: the API guarantees that it can never
514 * happen.
515 */
516 is_valid = false;
517 goto end;
518 }
519
520 switch (notif->type) {
521 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
522 /*
523 * We already have a stream state, which means
524 * we already returned a "stream begin"
525 * notification: this is an invalid duplicate.
526 */
527 is_valid = false;
528 goto end;
529 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
530 if (notif_packet == stream_state->cur_packet) {
531 /* Duplicate "packet begin" notification */
532 is_valid = false;
533 goto end;
534 }
535 break;
536 default:
537 break;
538 }
539 }
540
541 end:
542 return is_valid;
543 }
544
545 static
546 void add_action_push_notif(struct bt_notification_iterator *iterator,
547 struct bt_notification *notif)
548 {
549 struct action action = {
550 .type = ACTION_TYPE_PUSH_NOTIF,
551 .payload.push_notif = {
552 .notif = bt_get(notif),
553 },
554 };
555
556 assert(notif);
557 add_action(iterator, &action);
558 }
559
560 static
561 int add_action_push_notif_stream_begin(
562 struct bt_notification_iterator *iterator,
563 struct bt_ctf_stream *stream)
564 {
565 int ret = 0;
566 struct bt_notification *stream_begin_notif = NULL;
567
568 assert(stream);
569 stream_begin_notif = bt_notification_stream_begin_create(stream);
570 if (!stream_begin_notif) {
571 goto error;
572 }
573
574 add_action_push_notif(iterator, stream_begin_notif);
575 goto end;
576
577 error:
578 ret = -1;
579
580 end:
581 bt_put(stream_begin_notif);
582 return ret;
583 }
584
585 static
586 int add_action_push_notif_stream_end(
587 struct bt_notification_iterator *iterator,
588 struct bt_ctf_stream *stream)
589 {
590 int ret = 0;
591 struct bt_notification *stream_end_notif = NULL;
592
593 assert(stream);
594 stream_end_notif = bt_notification_stream_end_create(stream);
595 if (!stream_end_notif) {
596 goto error;
597 }
598
599 add_action_push_notif(iterator, stream_end_notif);
600 goto end;
601
602 error:
603 ret = -1;
604
605 end:
606 bt_put(stream_end_notif);
607 return ret;
608 }
609
610 static
611 int add_action_push_notif_packet_begin(
612 struct bt_notification_iterator *iterator,
613 struct bt_ctf_packet *packet)
614 {
615 int ret = 0;
616 struct bt_notification *packet_begin_notif = NULL;
617
618 assert(packet);
619 packet_begin_notif = bt_notification_packet_begin_create(packet);
620 if (!packet_begin_notif) {
621 goto error;
622 }
623
624 add_action_push_notif(iterator, packet_begin_notif);
625 goto end;
626
627 error:
628 ret = -1;
629
630 end:
631 bt_put(packet_begin_notif);
632 return ret;
633 }
634
635 static
636 int add_action_push_notif_packet_end(
637 struct bt_notification_iterator *iterator,
638 struct bt_ctf_packet *packet)
639 {
640 int ret = 0;
641 struct bt_notification *packet_end_notif = NULL;
642
643 assert(packet);
644 packet_end_notif = bt_notification_packet_end_create(packet);
645 if (!packet_end_notif) {
646 goto error;
647 }
648
649 add_action_push_notif(iterator, packet_end_notif);
650 goto end;
651
652 error:
653 ret = -1;
654
655 end:
656 bt_put(packet_end_notif);
657 return ret;
658 }
659
660 static
661 void add_action_set_stream_state_is_ended(
662 struct bt_notification_iterator *iterator,
663 struct stream_state *stream_state)
664 {
665 struct action action = {
666 .type = ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
667 .payload.set_stream_state_is_ended = {
668 .stream_state = stream_state,
669 },
670 };
671
672 assert(stream_state);
673 add_action(iterator, &action);
674 }
675
676 static
677 void add_action_set_stream_state_cur_packet(
678 struct bt_notification_iterator *iterator,
679 struct stream_state *stream_state,
680 struct bt_ctf_packet *packet)
681 {
682 struct action action = {
683 .type = ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
684 .payload.set_stream_state_cur_packet = {
685 .stream_state = stream_state,
686 .packet = bt_get(packet),
687 },
688 };
689
690 assert(stream_state);
691 add_action(iterator, &action);
692 }
693
694 static
695 int ensure_stream_state_exists(struct bt_notification_iterator *iterator,
696 struct bt_notification *stream_begin_notif,
697 struct bt_ctf_stream *notif_stream,
698 struct stream_state **stream_state)
699 {
700 int ret = 0;
701
702 if (!notif_stream) {
703 /*
704 * The notification does not reference any stream: no
705 * need to get or create a stream state.
706 */
707 goto end;
708 }
709
710 *stream_state = g_hash_table_lookup(iterator->stream_states,
711 notif_stream);
712 if (!*stream_state) {
713 /*
714 * This iterator did not bump into this stream yet:
715 * create a stream state and a "stream begin"
716 * notification.
717 */
718 struct action action = {
719 .type = ACTION_TYPE_ADD_STREAM_STATE,
720 .payload.add_stream_state = {
721 .stream = bt_get(notif_stream),
722 .stream_state = NULL,
723 },
724 };
725
726 *stream_state = create_stream_state(notif_stream);
727 if (!stream_state) {
728 goto error;
729 }
730
731 action.payload.add_stream_state.stream_state =
732 *stream_state;
733 add_action(iterator, &action);
734
735 if (stream_begin_notif) {
736 add_action_push_notif(iterator, stream_begin_notif);
737 } else {
738 ret = add_action_push_notif_stream_begin(iterator,
739 notif_stream);
740 if (ret) {
741 goto error;
742 }
743 }
744 }
745
746 goto end;
747
748 error:
749 destroy_stream_state(*stream_state);
750 ret = -1;
751
752 end:
753 return ret;
754 }
755
756 static
757 int handle_packet_switch(struct bt_notification_iterator *iterator,
758 struct bt_notification *packet_begin_notif,
759 struct bt_ctf_packet *new_packet,
760 struct stream_state *stream_state)
761 {
762 int ret = 0;
763
764 if (stream_state->cur_packet == new_packet) {
765 goto end;
766 }
767
768 if (stream_state->cur_packet) {
769 /* End of the current packet */
770 ret = add_action_push_notif_packet_end(iterator,
771 stream_state->cur_packet);
772 if (ret) {
773 goto error;
774 }
775 }
776
777 /* Beginning of the new packet */
778 if (packet_begin_notif) {
779 add_action_push_notif(iterator, packet_begin_notif);
780 } else if (new_packet) {
781 ret = add_action_push_notif_packet_begin(iterator,
782 new_packet);
783 if (ret) {
784 goto error;
785 }
786 }
787
788 add_action_set_stream_state_cur_packet(iterator, stream_state,
789 new_packet);
790 goto end;
791
792 error:
793 ret = -1;
794
795 end:
796 return ret;
797 }
798
799 static
800 int handle_notif_stream_begin(
801 struct bt_notification_iterator *iterator,
802 struct bt_notification *notif,
803 struct bt_ctf_stream *notif_stream)
804 {
805 int ret = 0;
806 struct stream_state *stream_state;
807
808 assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_BEGIN);
809 assert(notif_stream);
810 ret = ensure_stream_state_exists(iterator, notif, notif_stream,
811 &stream_state);
812 if (ret) {
813 goto error;
814 }
815
816 goto end;
817
818 error:
819 ret = -1;
820
821 end:
822 return ret;
823 }
824
825 static
826 int handle_notif_stream_end(
827 struct bt_notification_iterator *iterator,
828 struct bt_notification *notif,
829 struct bt_ctf_stream *notif_stream)
830 {
831 int ret = 0;
832 struct stream_state *stream_state;
833
834 assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_END);
835 assert(notif_stream);
836 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
837 &stream_state);
838 if (ret) {
839 goto error;
840 }
841
842 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
843 if (ret) {
844 goto error;
845 }
846
847 add_action_push_notif(iterator, notif);
848 add_action_set_stream_state_is_ended(iterator, stream_state);
849 goto end;
850
851 error:
852 ret = -1;
853
854 end:
855 return ret;
856 }
857
858 static
859 int handle_notif_packet_begin(
860 struct bt_notification_iterator *iterator,
861 struct bt_notification *notif,
862 struct bt_ctf_stream *notif_stream,
863 struct bt_ctf_packet *notif_packet)
864 {
865 int ret = 0;
866 struct stream_state *stream_state;
867
868 assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_BEGIN);
869 assert(notif_packet);
870 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
871 &stream_state);
872 if (ret) {
873 goto error;
874 }
875
876 ret = handle_packet_switch(iterator, notif, notif_packet, stream_state);
877 if (ret) {
878 goto error;
879 }
880
881 goto end;
882
883 error:
884 ret = -1;
885
886 end:
887 return ret;
888 }
889
890 static
891 int handle_notif_packet_end(
892 struct bt_notification_iterator *iterator,
893 struct bt_notification *notif,
894 struct bt_ctf_stream *notif_stream,
895 struct bt_ctf_packet *notif_packet)
896 {
897 int ret = 0;
898 struct stream_state *stream_state;
899
900 assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_END);
901 assert(notif_packet);
902 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
903 &stream_state);
904 if (ret) {
905 goto error;
906 }
907
908 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
909 if (ret) {
910 goto error;
911 }
912
913 /* End of the current packet */
914 add_action_push_notif(iterator, notif);
915 add_action_set_stream_state_cur_packet(iterator, stream_state, NULL);
916 goto end;
917
918 error:
919 ret = -1;
920
921 end:
922 return ret;
923 }
924
925 static
926 int handle_notif_event(
927 struct bt_notification_iterator *iterator,
928 struct bt_notification *notif,
929 struct bt_ctf_stream *notif_stream,
930 struct bt_ctf_packet *notif_packet)
931 {
932 int ret = 0;
933 struct stream_state *stream_state;
934
935 assert(notif->type == BT_NOTIFICATION_TYPE_EVENT);
936 assert(notif_packet);
937 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
938 &stream_state);
939 if (ret) {
940 goto error;
941 }
942
943 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
944 if (ret) {
945 goto error;
946 }
947
948 add_action_push_notif(iterator, notif);
949 goto end;
950
951 error:
952 ret = -1;
953
954 end:
955 return ret;
956 }
957
958 static
959 int enqueue_notification_and_automatic(
960 struct bt_notification_iterator *iterator,
961 struct bt_notification *notif)
962 {
963 int ret = 0;
964 struct bt_ctf_event *notif_event = NULL;
965 struct bt_ctf_stream *notif_stream = NULL;
966 struct bt_ctf_packet *notif_packet = NULL;
967
968 assert(notif);
969
970 /* Get the stream and packet referred by the notification */
971 switch (notif->type) {
972 case BT_NOTIFICATION_TYPE_EVENT:
973 notif_event = bt_notification_event_borrow_event(notif);
974 assert(notif_event);
975 notif_packet = bt_ctf_event_borrow_packet(notif_event);
976 assert(notif_packet);
977 break;
978 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
979 notif_stream =
980 bt_notification_stream_begin_borrow_stream(notif);
981 assert(notif_stream);
982 break;
983 case BT_NOTIFICATION_TYPE_STREAM_END:
984 notif_stream = bt_notification_stream_end_borrow_stream(notif);
985 assert(notif_stream);
986 break;
987 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
988 notif_packet =
989 bt_notification_packet_begin_borrow_packet(notif);
990 assert(notif_packet);
991 break;
992 case BT_NOTIFICATION_TYPE_PACKET_END:
993 notif_packet = bt_notification_packet_end_borrow_packet(notif);
994 assert(notif_packet);
995 break;
996 case BT_NOTIFICATION_TYPE_INACTIVITY:
997 /* Always valid */
998 break;
999 default:
1000 /*
1001 * Invalid type of notification. Only the notification
1002 * types above are allowed to be returned by a user
1003 * component.
1004 */
1005 goto error;
1006 }
1007
1008 if (notif_packet) {
1009 notif_stream = bt_ctf_packet_borrow_stream(notif_packet);
1010 assert(notif_stream);
1011 }
1012
1013 if (!notif_stream) {
1014 /*
1015 * The notification has no reference to a stream: it
1016 * cannot cause the creation of automatic notifications.
1017 */
1018 goto end;
1019 }
1020
1021 if (!validate_notification(iterator, notif, notif_stream,
1022 notif_packet)) {
1023 goto error;
1024 }
1025
1026 switch (notif->type) {
1027 case BT_NOTIFICATION_TYPE_EVENT:
1028 ret = handle_notif_event(iterator, notif, notif_stream,
1029 notif_packet);
1030 break;
1031 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1032 ret = handle_notif_stream_begin(iterator, notif, notif_stream);
1033 break;
1034 case BT_NOTIFICATION_TYPE_STREAM_END:
1035 ret = handle_notif_stream_end(iterator, notif, notif_stream);
1036 break;
1037 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1038 ret = handle_notif_packet_begin(iterator, notif, notif_stream,
1039 notif_packet);
1040 break;
1041 case BT_NOTIFICATION_TYPE_PACKET_END:
1042 ret = handle_notif_packet_end(iterator, notif, notif_stream,
1043 notif_packet);
1044 break;
1045 default:
1046 break;
1047 }
1048
1049 if (ret) {
1050 goto error;
1051 }
1052
1053 apply_actions(iterator);
1054 goto end;
1055
1056 error:
1057 ret = -1;
1058
1059 end:
1060 return ret;
1061 }
1062
1063 static
1064 int handle_end(struct bt_notification_iterator *iterator)
1065 {
1066 GHashTableIter stream_state_iter;
1067 gpointer stream_gptr, stream_state_gptr;
1068 int ret = 0;
1069
1070 /*
1071 * Emit a "stream end" notification for each non-ended stream
1072 * known by this iterator and mark them as ended.
1073 */
1074 g_hash_table_iter_init(&stream_state_iter, iterator->stream_states);
1075
1076 while (g_hash_table_iter_next(&stream_state_iter, &stream_gptr,
1077 &stream_state_gptr)) {
1078 struct stream_state *stream_state = stream_state_gptr;
1079
1080 assert(stream_state_gptr);
1081
1082 if (stream_state->is_ended) {
1083 continue;
1084 }
1085
1086 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
1087 if (ret) {
1088 goto error;
1089 }
1090
1091 ret = add_action_push_notif_stream_end(iterator, stream_gptr);
1092 if (ret) {
1093 goto error;
1094 }
1095
1096 add_action_set_stream_state_is_ended(iterator, stream_state);
1097 }
1098
1099 apply_actions(iterator);
1100 goto end;
1101
1102 error:
1103 ret = -1;
1104
1105 end:
1106 return ret;
1107 }
1108
1109 static
1110 enum bt_notification_iterator_status ensure_queue_has_notifications(
1111 struct bt_notification_iterator *iterator)
1112 {
1113 struct bt_private_notification_iterator *priv_iterator =
1114 bt_private_notification_iterator_from_notification_iterator(iterator);
1115 bt_component_class_notification_iterator_next_method next_method = NULL;
1116 struct bt_notification_iterator_next_return next_return = {
1117 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
1118 .notification = NULL,
1119 };
1120 enum bt_notification_iterator_status status =
1121 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1122 int ret;
1123
1124 assert(iterator);
1125
1126 if (iterator->queue->length > 0) {
1127 /* We already have enough */
1128 goto end;
1129 }
1130
1131 if (iterator->is_ended) {
1132 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
1133 goto end;
1134 }
1135
1136 assert(iterator->upstream_component);
1137 assert(iterator->upstream_component->class);
1138
1139 /* Pick the appropriate "next" method */
1140 switch (iterator->upstream_component->class->type) {
1141 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1142 {
1143 struct bt_component_class_source *source_class =
1144 container_of(iterator->upstream_component->class,
1145 struct bt_component_class_source, parent);
1146
1147 assert(source_class->methods.iterator.next);
1148 next_method = source_class->methods.iterator.next;
1149 break;
1150 }
1151 case BT_COMPONENT_CLASS_TYPE_FILTER:
1152 {
1153 struct bt_component_class_filter *filter_class =
1154 container_of(iterator->upstream_component->class,
1155 struct bt_component_class_filter, parent);
1156
1157 assert(filter_class->methods.iterator.next);
1158 next_method = filter_class->methods.iterator.next;
1159 break;
1160 }
1161 default:
1162 assert(false);
1163 break;
1164 }
1165
1166 /*
1167 * Call the user's "next" method to get the next notification
1168 * and status, skipping the forwarded automatic notifications
1169 * if any.
1170 */
1171 assert(next_method);
1172 next_return = next_method(priv_iterator);
1173 if (next_return.status < 0) {
1174 status = next_return.status;
1175 goto end;
1176 }
1177
1178 switch (next_return.status) {
1179 case BT_NOTIFICATION_ITERATOR_STATUS_END:
1180 ret = handle_end(iterator);
1181 if (ret) {
1182 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1183 goto end;
1184 }
1185
1186 if (iterator->queue->length == 0) {
1187 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
1188 }
1189
1190 iterator->is_ended = true;
1191 break;
1192 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
1193 status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
1194 break;
1195 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
1196 if (!next_return.notification) {
1197 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1198 goto end;
1199 }
1200
1201 /*
1202 * We know the notification is valid. Before we push it
1203 * to the head of the queue, push the appropriate
1204 * automatic notifications if any.
1205 */
1206 ret = enqueue_notification_and_automatic(iterator,
1207 next_return.notification);
1208 BT_PUT(next_return.notification);
1209 if (ret) {
1210 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1211 goto end;
1212 }
1213 break;
1214 default:
1215 /* Unknown non-error status */
1216 assert(false);
1217 }
1218
1219 end:
1220 return status;
1221 }
1222
1223 enum bt_notification_iterator_status
1224 bt_notification_iterator_next(struct bt_notification_iterator *iterator)
1225 {
1226 enum bt_notification_iterator_status status;
1227
1228 if (!iterator) {
1229 status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
1230 goto end;
1231 }
1232
1233 /*
1234 * Make sure that the iterator's queue contains at least one
1235 * notification.
1236 */
1237 status = ensure_queue_has_notifications(iterator);
1238 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1239 goto end;
1240 }
1241
1242 /*
1243 * Move the notification at the tail of the queue to the
1244 * iterator's current notification.
1245 */
1246 assert(iterator->queue->length > 0);
1247 bt_put(iterator->current_notification);
1248 iterator->current_notification = g_queue_pop_tail(iterator->queue);
1249 assert(iterator->current_notification);
1250
1251 end:
1252 return status;
1253 }
1254
1255 struct bt_component *bt_notification_iterator_get_component(
1256 struct bt_notification_iterator *iterator)
1257 {
1258 return bt_get(iterator->upstream_component);
1259 }
1260
1261 struct bt_private_component *
1262 bt_private_notification_iterator_get_private_component(
1263 struct bt_private_notification_iterator *private_iterator)
1264 {
1265 return bt_private_component_from_component(
1266 bt_notification_iterator_get_component(
1267 bt_notification_iterator_from_private(private_iterator)));
1268 }
1269
1270 enum bt_notification_iterator_status bt_notification_iterator_seek_time(
1271 struct bt_notification_iterator *iterator,
1272 enum bt_notification_iterator_seek_origin seek_origin,
1273 int64_t time)
1274 {
1275 enum bt_notification_iterator_status ret =
1276 BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
1277 return ret;
1278 }
This page took 0.057058 seconds and 4 git commands to generate.