bt_notification_iterator_create(): assert the type of comp. class
[babeltrace.git] / lib / graph / iterator.c
1 /*
2 * iterator.c
3 *
4 * Babeltrace Notification Iterator
5 *
6 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining a copy
10 * of this software and associated documentation files (the "Software"), to deal
11 * in the Software without restriction, including without limitation the rights
12 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 * copies of the Software, and to permit persons to whom the Software is
14 * furnished to do so, subject to the following conditions:
15 *
16 * The above copyright notice and this permission notice shall be included in
17 * all copies or substantial portions of the Software.
18 *
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 * SOFTWARE.
26 */
27
28 #include <babeltrace/compiler-internal.h>
29 #include <babeltrace/ref.h>
30 #include <babeltrace/ctf-ir/event-internal.h>
31 #include <babeltrace/ctf-ir/packet-internal.h>
32 #include <babeltrace/ctf-ir/stream-internal.h>
33 #include <babeltrace/graph/connection-internal.h>
34 #include <babeltrace/graph/component.h>
35 #include <babeltrace/graph/component-source-internal.h>
36 #include <babeltrace/graph/component-class-internal.h>
37 #include <babeltrace/graph/notification.h>
38 #include <babeltrace/graph/notification-iterator.h>
39 #include <babeltrace/graph/notification-iterator-internal.h>
40 #include <babeltrace/graph/notification-internal.h>
41 #include <babeltrace/graph/notification-event.h>
42 #include <babeltrace/graph/notification-event-internal.h>
43 #include <babeltrace/graph/notification-packet.h>
44 #include <babeltrace/graph/notification-packet-internal.h>
45 #include <babeltrace/graph/notification-stream.h>
46 #include <babeltrace/graph/notification-stream-internal.h>
47 #include <babeltrace/graph/port.h>
48 #include <babeltrace/types.h>
49 #include <stdint.h>
50
51 struct stream_state {
52 struct bt_ctf_stream *stream; /* owned by this */
53 struct bt_ctf_packet *cur_packet; /* owned by this */
54 bt_bool is_ended;
55 };
56
57 enum action_type {
58 ACTION_TYPE_PUSH_NOTIF,
59 ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
60 ACTION_TYPE_ADD_STREAM_STATE,
61 ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
62 ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
63 };
64
65 struct action {
66 enum action_type type;
67 union {
68 /* ACTION_TYPE_PUSH_NOTIF */
69 struct {
70 struct bt_notification *notif; /* owned by this */
71 } push_notif;
72
73 /* ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM */
74 struct {
75 struct bt_ctf_stream *stream; /* owned by this */
76 struct bt_component *component; /* owned by this */
77 struct bt_port *port; /* owned by this */
78 } map_port_to_comp_in_stream;
79
80 /* ACTION_TYPE_ADD_STREAM_STATE */
81 struct {
82 struct bt_ctf_stream *stream; /* owned by this */
83 struct stream_state *stream_state; /* owned by this */
84 } add_stream_state;
85
86 /* ACTION_TYPE_SET_STREAM_STATE_IS_ENDED */
87 struct {
88 struct stream_state *stream_state; /* weak */
89 } set_stream_state_is_ended;
90
91 /* ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET */
92 struct {
93 struct stream_state *stream_state; /* weak */
94 struct bt_ctf_packet *packet; /* owned by this */
95 } set_stream_state_cur_packet;
96 } payload;
97 };
98
99 static
100 void stream_destroy_listener(struct bt_ctf_stream *stream, void *data)
101 {
102 struct bt_notification_iterator *iterator = data;
103
104 /* Remove associated stream state */
105 g_hash_table_remove(iterator->stream_states, stream);
106 }
107
108 static
109 void destroy_stream_state(struct stream_state *stream_state)
110 {
111 if (!stream_state) {
112 return;
113 }
114
115 bt_put(stream_state->cur_packet);
116 bt_put(stream_state->stream);
117 g_free(stream_state);
118 }
119
120 static
121 void destroy_action(struct action *action)
122 {
123 assert(action);
124
125 switch (action->type) {
126 case ACTION_TYPE_PUSH_NOTIF:
127 BT_PUT(action->payload.push_notif.notif);
128 break;
129 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
130 BT_PUT(action->payload.map_port_to_comp_in_stream.stream);
131 BT_PUT(action->payload.map_port_to_comp_in_stream.component);
132 BT_PUT(action->payload.map_port_to_comp_in_stream.port);
133 break;
134 case ACTION_TYPE_ADD_STREAM_STATE:
135 BT_PUT(action->payload.add_stream_state.stream);
136 destroy_stream_state(
137 action->payload.add_stream_state.stream_state);
138 action->payload.add_stream_state.stream_state = NULL;
139 break;
140 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
141 BT_PUT(action->payload.set_stream_state_cur_packet.packet);
142 break;
143 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
144 break;
145 default:
146 assert(BT_FALSE);
147 }
148 }
149
150 static
151 void add_action(struct bt_notification_iterator *iterator,
152 struct action *action)
153 {
154 g_array_append_val(iterator->actions, *action);
155 }
156
157 static
158 void clear_actions(struct bt_notification_iterator *iterator)
159 {
160 size_t i;
161
162 for (i = 0; i < iterator->actions->len; i++) {
163 struct action *action = &g_array_index(iterator->actions,
164 struct action, i);
165
166 destroy_action(action);
167 }
168
169 g_array_set_size(iterator->actions, 0);
170 }
171
172 static
173 void apply_actions(struct bt_notification_iterator *iterator)
174 {
175 size_t i;
176
177 for (i = 0; i < iterator->actions->len; i++) {
178 struct action *action = &g_array_index(iterator->actions,
179 struct action, i);
180
181 switch (action->type) {
182 case ACTION_TYPE_PUSH_NOTIF:
183 /* Move notification to queue */
184 g_queue_push_head(iterator->queue,
185 action->payload.push_notif.notif);
186 bt_notification_freeze(
187 action->payload.push_notif.notif);
188 action->payload.push_notif.notif = NULL;
189 break;
190 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
191 bt_ctf_stream_map_component_to_port(
192 action->payload.map_port_to_comp_in_stream.stream,
193 action->payload.map_port_to_comp_in_stream.component,
194 action->payload.map_port_to_comp_in_stream.port);
195 break;
196 case ACTION_TYPE_ADD_STREAM_STATE:
197 /* Move stream state to hash table */
198 g_hash_table_insert(iterator->stream_states,
199 action->payload.add_stream_state.stream,
200 action->payload.add_stream_state.stream_state);
201
202 action->payload.add_stream_state.stream_state = NULL;
203 break;
204 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
205 /*
206 * We know that this stream is ended. We need to
207 * remember this as long as the stream exists to
208 * enforce that the same stream does not end
209 * twice.
210 *
211 * Here we add a destroy listener to the stream
212 * which we put after (becomes weak as the hash
213 * table key). If we were the last object to own
214 * this stream, the destroy listener is called
215 * when we call bt_put() which removes this
216 * stream state completely. This is important
217 * because the memory used by this stream object
218 * could be reused for another stream, and they
219 * must have different states.
220 */
221 bt_ctf_stream_add_destroy_listener(
222 action->payload.set_stream_state_is_ended.stream_state->stream,
223 stream_destroy_listener, iterator);
224 action->payload.set_stream_state_is_ended.stream_state->is_ended = BT_TRUE;
225 BT_PUT(action->payload.set_stream_state_is_ended.stream_state->stream);
226 break;
227 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
228 /* Move packet to stream state's current packet */
229 BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet,
230 action->payload.set_stream_state_cur_packet.packet);
231 break;
232 default:
233 assert(BT_FALSE);
234 }
235 }
236
237 clear_actions(iterator);
238 }
239
240 static
241 struct stream_state *create_stream_state(struct bt_ctf_stream *stream)
242 {
243 struct stream_state *stream_state = g_new0(struct stream_state, 1);
244
245 if (!stream_state) {
246 goto end;
247 }
248
249 /*
250 * We keep a reference to the stream until we know it's ended
251 * because we need to be able to create an automatic "stream
252 * end" notification when the user's "next" method returns
253 * BT_NOTIFICATION_ITERATOR_STATUS_END.
254 *
255 * We put this reference when the stream is marked as ended.
256 */
257 stream_state->stream = bt_get(stream);
258
259 end:
260 return stream_state;
261 }
262
263 static
264 void bt_notification_iterator_destroy(struct bt_object *obj)
265 {
266 struct bt_notification_iterator *iterator;
267
268 assert(obj);
269
270 /*
271 * The notification iterator's reference count is 0 if we're
272 * here. Increment it to avoid a double-destroy (possibly
273 * infinitely recursive). This could happen for example if the
274 * notification iterator's finalization function does bt_get()
275 * (or anything that causes bt_get() to be called) on itself
276 * (ref. count goes from 0 to 1), and then bt_put(): the
277 * reference count would go from 1 to 0 again and this function
278 * would be called again.
279 */
280 obj->ref_count.count++;
281 iterator = container_of(obj, struct bt_notification_iterator, base);
282 bt_notification_iterator_finalize(iterator);
283
284 if (iterator->queue) {
285 struct bt_notification *notif;
286
287 while ((notif = g_queue_pop_tail(iterator->queue))) {
288 bt_put(notif);
289 }
290
291 g_queue_free(iterator->queue);
292 }
293
294 if (iterator->stream_states) {
295 /*
296 * Remove our destroy listener from each stream which
297 * has a state in this iterator. Otherwise the destroy
298 * listener would be called with an invalid/other
299 * notification iterator object.
300 */
301 GHashTableIter ht_iter;
302 gpointer stream_gptr, stream_state_gptr;
303
304 g_hash_table_iter_init(&ht_iter, iterator->stream_states);
305
306 while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) {
307 assert(stream_gptr);
308 bt_ctf_stream_remove_destroy_listener(
309 (void *) stream_gptr, stream_destroy_listener,
310 iterator);
311 }
312
313 g_hash_table_destroy(iterator->stream_states);
314 }
315
316 if (iterator->actions) {
317 g_array_free(iterator->actions, TRUE);
318 }
319
320 if (iterator->connection) {
321 /*
322 * Remove ourself from the originating connection so
323 * that it does not try to finalize a dangling pointer
324 * later.
325 */
326 bt_connection_remove_iterator(iterator->connection, iterator);
327 }
328
329 bt_put(iterator->current_notification);
330 g_free(iterator);
331 }
332
333 BT_HIDDEN
334 void bt_notification_iterator_finalize(
335 struct bt_notification_iterator *iterator)
336 {
337 struct bt_component_class *comp_class = NULL;
338 bt_component_class_notification_iterator_finalize_method
339 finalize_method = NULL;
340
341 assert(iterator);
342
343 switch (iterator->state) {
344 case BT_NOTIFICATION_ITERATOR_STATE_FINALIZED:
345 case BT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED:
346 /* Already finalized */
347 return;
348 default:
349 break;
350 }
351
352 if (iterator->state == BT_NOTIFICATION_ITERATOR_STATE_ENDED) {
353 iterator->state = BT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED;
354 } else {
355 iterator->state = BT_NOTIFICATION_ITERATOR_STATE_FINALIZED;
356 }
357
358 assert(iterator->upstream_component);
359 comp_class = iterator->upstream_component->class;
360
361 /* Call user-defined destroy method */
362 switch (comp_class->type) {
363 case BT_COMPONENT_CLASS_TYPE_SOURCE:
364 {
365 struct bt_component_class_source *source_class;
366
367 source_class = container_of(comp_class, struct bt_component_class_source, parent);
368 finalize_method = source_class->methods.iterator.finalize;
369 break;
370 }
371 case BT_COMPONENT_CLASS_TYPE_FILTER:
372 {
373 struct bt_component_class_filter *filter_class;
374
375 filter_class = container_of(comp_class, struct bt_component_class_filter, parent);
376 finalize_method = filter_class->methods.iterator.finalize;
377 break;
378 }
379 default:
380 /* Unreachable */
381 assert(0);
382 }
383
384 if (finalize_method) {
385 finalize_method(
386 bt_private_notification_iterator_from_notification_iterator(iterator));
387 }
388
389 iterator->upstream_component = NULL;
390 iterator->upstream_port = NULL;
391 }
392
393 BT_HIDDEN
394 void bt_notification_iterator_set_connection(
395 struct bt_notification_iterator *iterator,
396 struct bt_connection *connection)
397 {
398 assert(iterator);
399 iterator->connection = connection;
400 }
401
402 static
403 int create_subscription_mask_from_notification_types(
404 struct bt_notification_iterator *iterator,
405 const enum bt_notification_type *notif_types)
406 {
407 const enum bt_notification_type *notif_type;
408 int ret = 0;
409
410 assert(notif_types);
411 iterator->subscription_mask = 0;
412
413 for (notif_type = notif_types;
414 *notif_type != BT_NOTIFICATION_TYPE_SENTINEL;
415 notif_type++) {
416 switch (*notif_type) {
417 case BT_NOTIFICATION_TYPE_ALL:
418 iterator->subscription_mask |=
419 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT |
420 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY |
421 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN |
422 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END |
423 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN |
424 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
425 break;
426 case BT_NOTIFICATION_TYPE_EVENT:
427 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT;
428 break;
429 case BT_NOTIFICATION_TYPE_INACTIVITY:
430 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY;
431 break;
432 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
433 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN;
434 break;
435 case BT_NOTIFICATION_TYPE_STREAM_END:
436 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END;
437 break;
438 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
439 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN;
440 break;
441 case BT_NOTIFICATION_TYPE_PACKET_END:
442 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
443 break;
444 default:
445 ret = -1;
446 goto end;
447 }
448 }
449
450 end:
451 return ret;
452 }
453
454 BT_HIDDEN
455 struct bt_notification_iterator *bt_notification_iterator_create(
456 struct bt_component *upstream_comp,
457 struct bt_port *upstream_port,
458 const enum bt_notification_type *notification_types,
459 struct bt_connection *connection)
460 {
461 enum bt_component_class_type type;
462 struct bt_notification_iterator *iterator = NULL;
463
464 assert(upstream_comp);
465 assert(upstream_port);
466 assert(notification_types);
467 assert(bt_port_is_connected(upstream_port));
468
469 type = bt_component_get_class_type(upstream_comp);
470 assert(type == BT_COMPONENT_CLASS_TYPE_SOURCE ||
471 type == BT_COMPONENT_CLASS_TYPE_FILTER);
472 iterator = g_new0(struct bt_notification_iterator, 1);
473 if (!iterator) {
474 goto error;
475 }
476
477 bt_object_init(iterator, bt_notification_iterator_destroy);
478
479 if (create_subscription_mask_from_notification_types(iterator,
480 notification_types)) {
481 goto error;
482 }
483
484 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
485 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
486 if (!iterator->stream_states) {
487 goto error;
488 }
489
490 iterator->queue = g_queue_new();
491 if (!iterator->queue) {
492 goto error;
493 }
494
495 iterator->actions = g_array_new(FALSE, FALSE, sizeof(struct action));
496 if (!iterator->actions) {
497 goto error;
498 }
499
500 iterator->upstream_component = upstream_comp;
501 iterator->upstream_port = upstream_port;
502 iterator->connection = connection;
503 iterator->state = BT_NOTIFICATION_ITERATOR_STATE_ACTIVE;
504 goto end;
505
506 error:
507 BT_PUT(iterator);
508
509 end:
510 return iterator;
511 }
512
513 BT_HIDDEN
514 enum bt_notification_iterator_status bt_notification_iterator_validate(
515 struct bt_notification_iterator *iterator)
516 {
517 enum bt_notification_iterator_status ret =
518 BT_NOTIFICATION_ITERATOR_STATUS_OK;
519
520 if (!iterator) {
521 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
522 goto end;
523 }
524 end:
525 return ret;
526 }
527
528 void *bt_private_notification_iterator_get_user_data(
529 struct bt_private_notification_iterator *private_iterator)
530 {
531 struct bt_notification_iterator *iterator =
532 bt_notification_iterator_from_private(private_iterator);
533
534 return iterator ? iterator->user_data : NULL;
535 }
536
537 enum bt_notification_iterator_status
538 bt_private_notification_iterator_set_user_data(
539 struct bt_private_notification_iterator *private_iterator,
540 void *data)
541 {
542 enum bt_notification_iterator_status ret =
543 BT_NOTIFICATION_ITERATOR_STATUS_OK;
544 struct bt_notification_iterator *iterator =
545 bt_notification_iterator_from_private(private_iterator);
546
547 if (!iterator) {
548 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
549 goto end;
550 }
551
552 iterator->user_data = data;
553 end:
554 return ret;
555 }
556
557 struct bt_notification *bt_notification_iterator_get_notification(
558 struct bt_notification_iterator *iterator)
559 {
560 struct bt_notification *notification = NULL;
561
562 if (!iterator) {
563 goto end;
564 }
565
566 notification = bt_get(iterator->current_notification);
567
568 end:
569 return notification;
570 }
571
572 static
573 enum bt_notification_iterator_notif_type
574 bt_notification_iterator_notif_type_from_notif_type(
575 enum bt_notification_type notif_type)
576 {
577 enum bt_notification_iterator_notif_type iter_notif_type;
578
579 switch (notif_type) {
580 case BT_NOTIFICATION_TYPE_EVENT:
581 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT;
582 break;
583 case BT_NOTIFICATION_TYPE_INACTIVITY:
584 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY;
585 break;
586 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
587 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN;
588 break;
589 case BT_NOTIFICATION_TYPE_STREAM_END:
590 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END;
591 break;
592 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
593 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN;
594 break;
595 case BT_NOTIFICATION_TYPE_PACKET_END:
596 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
597 break;
598 default:
599 assert(BT_FALSE);
600 }
601
602 return iter_notif_type;
603 }
604
605 static
606 bt_bool validate_notification(struct bt_notification_iterator *iterator,
607 struct bt_notification *notif,
608 struct bt_ctf_stream *notif_stream,
609 struct bt_ctf_packet *notif_packet)
610 {
611 bt_bool is_valid = BT_TRUE;
612 struct stream_state *stream_state;
613 struct bt_port *stream_comp_cur_port;
614
615 assert(notif_stream);
616 stream_comp_cur_port =
617 bt_ctf_stream_port_for_component(notif_stream,
618 iterator->upstream_component);
619 if (!stream_comp_cur_port) {
620 /*
621 * This is the first time this notification iterator
622 * bumps into this stream. Add an action to map the
623 * iterator's upstream component to the iterator's
624 * upstream port in this stream.
625 */
626 struct action action = {
627 .type = ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
628 .payload.map_port_to_comp_in_stream = {
629 .stream = bt_get(notif_stream),
630 .component = bt_get(iterator->upstream_component),
631 .port = bt_get(iterator->upstream_port),
632 },
633 };
634
635 add_action(iterator, &action);
636 } else {
637 if (stream_comp_cur_port != iterator->upstream_port) {
638 /*
639 * It looks like two different ports of the same
640 * component are emitting notifications which
641 * have references to the same stream. This is
642 * bad: the API guarantees that it can never
643 * happen.
644 */
645 is_valid = BT_FALSE;
646 goto end;
647 }
648
649 }
650
651 stream_state = g_hash_table_lookup(iterator->stream_states,
652 notif_stream);
653 if (stream_state) {
654 if (stream_state->is_ended) {
655 /*
656 * There's a new notification which has a
657 * reference to a stream which, from this
658 * iterator's point of view, is ended ("end of
659 * stream" notification was returned). This is
660 * bad: the API guarantees that it can never
661 * happen.
662 */
663 is_valid = BT_FALSE;
664 goto end;
665 }
666
667 switch (notif->type) {
668 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
669 /*
670 * We already have a stream state, which means
671 * we already returned a "stream begin"
672 * notification: this is an invalid duplicate.
673 */
674 is_valid = BT_FALSE;
675 goto end;
676 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
677 if (notif_packet == stream_state->cur_packet) {
678 /* Duplicate "packet begin" notification */
679 is_valid = BT_FALSE;
680 goto end;
681 }
682 break;
683 default:
684 break;
685 }
686 }
687
688 end:
689 return is_valid;
690 }
691
692 static
693 bt_bool is_subscribed_to_notification_type(struct bt_notification_iterator *iterator,
694 enum bt_notification_type notif_type)
695 {
696 uint32_t iter_notif_type =
697 (uint32_t) bt_notification_iterator_notif_type_from_notif_type(
698 notif_type);
699
700 return (iter_notif_type & iterator->subscription_mask) ? BT_TRUE : BT_FALSE;
701 }
702
703 static
704 void add_action_push_notif(struct bt_notification_iterator *iterator,
705 struct bt_notification *notif)
706 {
707 struct action action = {
708 .type = ACTION_TYPE_PUSH_NOTIF,
709 };
710
711 assert(notif);
712
713 if (!is_subscribed_to_notification_type(iterator, notif->type)) {
714 return;
715 }
716
717 action.payload.push_notif.notif = bt_get(notif);
718 add_action(iterator, &action);
719 }
720
721 static
722 int add_action_push_notif_stream_begin(
723 struct bt_notification_iterator *iterator,
724 struct bt_ctf_stream *stream)
725 {
726 int ret = 0;
727 struct bt_notification *stream_begin_notif = NULL;
728
729 if (!is_subscribed_to_notification_type(iterator,
730 BT_NOTIFICATION_TYPE_STREAM_BEGIN)) {
731 goto end;
732 }
733
734 assert(stream);
735 stream_begin_notif = bt_notification_stream_begin_create(stream);
736 if (!stream_begin_notif) {
737 goto error;
738 }
739
740 add_action_push_notif(iterator, stream_begin_notif);
741 goto end;
742
743 error:
744 ret = -1;
745
746 end:
747 bt_put(stream_begin_notif);
748 return ret;
749 }
750
751 static
752 int add_action_push_notif_stream_end(
753 struct bt_notification_iterator *iterator,
754 struct bt_ctf_stream *stream)
755 {
756 int ret = 0;
757 struct bt_notification *stream_end_notif = NULL;
758
759 if (!is_subscribed_to_notification_type(iterator,
760 BT_NOTIFICATION_TYPE_STREAM_END)) {
761 goto end;
762 }
763
764 assert(stream);
765 stream_end_notif = bt_notification_stream_end_create(stream);
766 if (!stream_end_notif) {
767 goto error;
768 }
769
770 add_action_push_notif(iterator, stream_end_notif);
771 goto end;
772
773 error:
774 ret = -1;
775
776 end:
777 bt_put(stream_end_notif);
778 return ret;
779 }
780
781 static
782 int add_action_push_notif_packet_begin(
783 struct bt_notification_iterator *iterator,
784 struct bt_ctf_packet *packet)
785 {
786 int ret = 0;
787 struct bt_notification *packet_begin_notif = NULL;
788
789 if (!is_subscribed_to_notification_type(iterator,
790 BT_NOTIFICATION_TYPE_PACKET_BEGIN)) {
791 goto end;
792 }
793
794 assert(packet);
795 packet_begin_notif = bt_notification_packet_begin_create(packet);
796 if (!packet_begin_notif) {
797 goto error;
798 }
799
800 add_action_push_notif(iterator, packet_begin_notif);
801 goto end;
802
803 error:
804 ret = -1;
805
806 end:
807 bt_put(packet_begin_notif);
808 return ret;
809 }
810
811 static
812 int add_action_push_notif_packet_end(
813 struct bt_notification_iterator *iterator,
814 struct bt_ctf_packet *packet)
815 {
816 int ret = 0;
817 struct bt_notification *packet_end_notif = NULL;
818
819 if (!is_subscribed_to_notification_type(iterator,
820 BT_NOTIFICATION_TYPE_PACKET_END)) {
821 goto end;
822 }
823
824 assert(packet);
825 packet_end_notif = bt_notification_packet_end_create(packet);
826 if (!packet_end_notif) {
827 goto error;
828 }
829
830 add_action_push_notif(iterator, packet_end_notif);
831 goto end;
832
833 error:
834 ret = -1;
835
836 end:
837 bt_put(packet_end_notif);
838 return ret;
839 }
840
841 static
842 void add_action_set_stream_state_is_ended(
843 struct bt_notification_iterator *iterator,
844 struct stream_state *stream_state)
845 {
846 struct action action = {
847 .type = ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
848 .payload.set_stream_state_is_ended = {
849 .stream_state = stream_state,
850 },
851 };
852
853 assert(stream_state);
854 add_action(iterator, &action);
855 }
856
857 static
858 void add_action_set_stream_state_cur_packet(
859 struct bt_notification_iterator *iterator,
860 struct stream_state *stream_state,
861 struct bt_ctf_packet *packet)
862 {
863 struct action action = {
864 .type = ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
865 .payload.set_stream_state_cur_packet = {
866 .stream_state = stream_state,
867 .packet = bt_get(packet),
868 },
869 };
870
871 assert(stream_state);
872 add_action(iterator, &action);
873 }
874
875 static
876 int ensure_stream_state_exists(struct bt_notification_iterator *iterator,
877 struct bt_notification *stream_begin_notif,
878 struct bt_ctf_stream *notif_stream,
879 struct stream_state **stream_state)
880 {
881 int ret = 0;
882
883 if (!notif_stream) {
884 /*
885 * The notification does not reference any stream: no
886 * need to get or create a stream state.
887 */
888 goto end;
889 }
890
891 *stream_state = g_hash_table_lookup(iterator->stream_states,
892 notif_stream);
893 if (!*stream_state) {
894 /*
895 * This iterator did not bump into this stream yet:
896 * create a stream state and a "stream begin"
897 * notification.
898 */
899 struct action action = {
900 .type = ACTION_TYPE_ADD_STREAM_STATE,
901 .payload.add_stream_state = {
902 .stream = bt_get(notif_stream),
903 .stream_state = NULL,
904 },
905 };
906
907 *stream_state = create_stream_state(notif_stream);
908 if (!stream_state) {
909 goto error;
910 }
911
912 action.payload.add_stream_state.stream_state =
913 *stream_state;
914 add_action(iterator, &action);
915
916 if (stream_begin_notif) {
917 add_action_push_notif(iterator, stream_begin_notif);
918 } else {
919 ret = add_action_push_notif_stream_begin(iterator,
920 notif_stream);
921 if (ret) {
922 goto error;
923 }
924 }
925 }
926
927 goto end;
928
929 error:
930 destroy_stream_state(*stream_state);
931 ret = -1;
932
933 end:
934 return ret;
935 }
936
937 static
938 int handle_packet_switch(struct bt_notification_iterator *iterator,
939 struct bt_notification *packet_begin_notif,
940 struct bt_ctf_packet *new_packet,
941 struct stream_state *stream_state)
942 {
943 int ret = 0;
944
945 if (stream_state->cur_packet == new_packet) {
946 goto end;
947 }
948
949 if (stream_state->cur_packet) {
950 /* End of the current packet */
951 ret = add_action_push_notif_packet_end(iterator,
952 stream_state->cur_packet);
953 if (ret) {
954 goto error;
955 }
956 }
957
958 /* Beginning of the new packet */
959 if (packet_begin_notif) {
960 add_action_push_notif(iterator, packet_begin_notif);
961 } else if (new_packet) {
962 ret = add_action_push_notif_packet_begin(iterator,
963 new_packet);
964 if (ret) {
965 goto error;
966 }
967 }
968
969 add_action_set_stream_state_cur_packet(iterator, stream_state,
970 new_packet);
971 goto end;
972
973 error:
974 ret = -1;
975
976 end:
977 return ret;
978 }
979
980 static
981 int handle_notif_stream_begin(
982 struct bt_notification_iterator *iterator,
983 struct bt_notification *notif,
984 struct bt_ctf_stream *notif_stream)
985 {
986 int ret = 0;
987 struct stream_state *stream_state;
988
989 assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_BEGIN);
990 assert(notif_stream);
991 ret = ensure_stream_state_exists(iterator, notif, notif_stream,
992 &stream_state);
993 if (ret) {
994 goto error;
995 }
996
997 goto end;
998
999 error:
1000 ret = -1;
1001
1002 end:
1003 return ret;
1004 }
1005
1006 static
1007 int handle_notif_stream_end(
1008 struct bt_notification_iterator *iterator,
1009 struct bt_notification *notif,
1010 struct bt_ctf_stream *notif_stream)
1011 {
1012 int ret = 0;
1013 struct stream_state *stream_state;
1014
1015 assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_END);
1016 assert(notif_stream);
1017 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1018 &stream_state);
1019 if (ret) {
1020 goto error;
1021 }
1022
1023 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
1024 if (ret) {
1025 goto error;
1026 }
1027
1028 add_action_push_notif(iterator, notif);
1029 add_action_set_stream_state_is_ended(iterator, stream_state);
1030 goto end;
1031
1032 error:
1033 ret = -1;
1034
1035 end:
1036 return ret;
1037 }
1038
1039 static
1040 int handle_notif_packet_begin(
1041 struct bt_notification_iterator *iterator,
1042 struct bt_notification *notif,
1043 struct bt_ctf_stream *notif_stream,
1044 struct bt_ctf_packet *notif_packet)
1045 {
1046 int ret = 0;
1047 struct stream_state *stream_state;
1048
1049 assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_BEGIN);
1050 assert(notif_packet);
1051 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1052 &stream_state);
1053 if (ret) {
1054 goto error;
1055 }
1056
1057 ret = handle_packet_switch(iterator, notif, notif_packet, stream_state);
1058 if (ret) {
1059 goto error;
1060 }
1061
1062 goto end;
1063
1064 error:
1065 ret = -1;
1066
1067 end:
1068 return ret;
1069 }
1070
1071 static
1072 int handle_notif_packet_end(
1073 struct bt_notification_iterator *iterator,
1074 struct bt_notification *notif,
1075 struct bt_ctf_stream *notif_stream,
1076 struct bt_ctf_packet *notif_packet)
1077 {
1078 int ret = 0;
1079 struct stream_state *stream_state;
1080
1081 assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_END);
1082 assert(notif_packet);
1083 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1084 &stream_state);
1085 if (ret) {
1086 goto error;
1087 }
1088
1089 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
1090 if (ret) {
1091 goto error;
1092 }
1093
1094 /* End of the current packet */
1095 add_action_push_notif(iterator, notif);
1096 add_action_set_stream_state_cur_packet(iterator, stream_state, NULL);
1097 goto end;
1098
1099 error:
1100 ret = -1;
1101
1102 end:
1103 return ret;
1104 }
1105
1106 static
1107 int handle_notif_event(
1108 struct bt_notification_iterator *iterator,
1109 struct bt_notification *notif,
1110 struct bt_ctf_stream *notif_stream,
1111 struct bt_ctf_packet *notif_packet)
1112 {
1113 int ret = 0;
1114 struct stream_state *stream_state;
1115
1116 assert(notif->type == BT_NOTIFICATION_TYPE_EVENT);
1117 assert(notif_packet);
1118 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1119 &stream_state);
1120 if (ret) {
1121 goto error;
1122 }
1123
1124 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
1125 if (ret) {
1126 goto error;
1127 }
1128
1129 add_action_push_notif(iterator, notif);
1130 goto end;
1131
1132 error:
1133 ret = -1;
1134
1135 end:
1136 return ret;
1137 }
1138
1139 static
1140 int enqueue_notification_and_automatic(
1141 struct bt_notification_iterator *iterator,
1142 struct bt_notification *notif)
1143 {
1144 int ret = 0;
1145 struct bt_ctf_event *notif_event = NULL;
1146 struct bt_ctf_stream *notif_stream = NULL;
1147 struct bt_ctf_packet *notif_packet = NULL;
1148
1149 assert(notif);
1150
1151 // TODO: Skip most of this if the iterator is only subscribed
1152 // to event/inactivity notifications.
1153
1154 /* Get the stream and packet referred by the notification */
1155 switch (notif->type) {
1156 case BT_NOTIFICATION_TYPE_EVENT:
1157 notif_event = bt_notification_event_borrow_event(notif);
1158 assert(notif_event);
1159 notif_packet = bt_ctf_event_borrow_packet(notif_event);
1160 assert(notif_packet);
1161 break;
1162 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1163 notif_stream =
1164 bt_notification_stream_begin_borrow_stream(notif);
1165 assert(notif_stream);
1166 break;
1167 case BT_NOTIFICATION_TYPE_STREAM_END:
1168 notif_stream = bt_notification_stream_end_borrow_stream(notif);
1169 assert(notif_stream);
1170 break;
1171 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1172 notif_packet =
1173 bt_notification_packet_begin_borrow_packet(notif);
1174 assert(notif_packet);
1175 break;
1176 case BT_NOTIFICATION_TYPE_PACKET_END:
1177 notif_packet = bt_notification_packet_end_borrow_packet(notif);
1178 assert(notif_packet);
1179 break;
1180 case BT_NOTIFICATION_TYPE_INACTIVITY:
1181 /* Always valid */
1182 goto handle_notif;
1183 default:
1184 /*
1185 * Invalid type of notification. Only the notification
1186 * types above are allowed to be returned by a user
1187 * component.
1188 */
1189 goto error;
1190 }
1191
1192 if (notif_packet) {
1193 notif_stream = bt_ctf_packet_borrow_stream(notif_packet);
1194 assert(notif_stream);
1195 }
1196
1197 if (!notif_stream) {
1198 /*
1199 * The notification has no reference to a stream: it
1200 * cannot cause the creation of automatic notifications.
1201 */
1202 goto end;
1203 }
1204
1205 if (!validate_notification(iterator, notif, notif_stream,
1206 notif_packet)) {
1207 goto error;
1208 }
1209
1210 handle_notif:
1211 switch (notif->type) {
1212 case BT_NOTIFICATION_TYPE_EVENT:
1213 ret = handle_notif_event(iterator, notif, notif_stream,
1214 notif_packet);
1215 break;
1216 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1217 ret = handle_notif_stream_begin(iterator, notif, notif_stream);
1218 break;
1219 case BT_NOTIFICATION_TYPE_STREAM_END:
1220 ret = handle_notif_stream_end(iterator, notif, notif_stream);
1221 break;
1222 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1223 ret = handle_notif_packet_begin(iterator, notif, notif_stream,
1224 notif_packet);
1225 break;
1226 case BT_NOTIFICATION_TYPE_PACKET_END:
1227 ret = handle_notif_packet_end(iterator, notif, notif_stream,
1228 notif_packet);
1229 break;
1230 case BT_NOTIFICATION_TYPE_INACTIVITY:
1231 add_action_push_notif(iterator, notif);
1232 break;
1233 default:
1234 break;
1235 }
1236
1237 if (ret) {
1238 goto error;
1239 }
1240
1241 apply_actions(iterator);
1242 goto end;
1243
1244 error:
1245 ret = -1;
1246
1247 end:
1248 return ret;
1249 }
1250
1251 static
1252 int handle_end(struct bt_notification_iterator *iterator)
1253 {
1254 GHashTableIter stream_state_iter;
1255 gpointer stream_gptr, stream_state_gptr;
1256 int ret = 0;
1257
1258 /*
1259 * Emit a "stream end" notification for each non-ended stream
1260 * known by this iterator and mark them as ended.
1261 */
1262 g_hash_table_iter_init(&stream_state_iter, iterator->stream_states);
1263
1264 while (g_hash_table_iter_next(&stream_state_iter, &stream_gptr,
1265 &stream_state_gptr)) {
1266 struct stream_state *stream_state = stream_state_gptr;
1267
1268 assert(stream_state_gptr);
1269
1270 if (stream_state->is_ended) {
1271 continue;
1272 }
1273
1274 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
1275 if (ret) {
1276 goto error;
1277 }
1278
1279 ret = add_action_push_notif_stream_end(iterator, stream_gptr);
1280 if (ret) {
1281 goto error;
1282 }
1283
1284 add_action_set_stream_state_is_ended(iterator, stream_state);
1285 }
1286
1287 apply_actions(iterator);
1288 goto end;
1289
1290 error:
1291 ret = -1;
1292
1293 end:
1294 return ret;
1295 }
1296
1297 static
1298 enum bt_notification_iterator_status ensure_queue_has_notifications(
1299 struct bt_notification_iterator *iterator)
1300 {
1301 struct bt_private_notification_iterator *priv_iterator =
1302 bt_private_notification_iterator_from_notification_iterator(iterator);
1303 bt_component_class_notification_iterator_next_method next_method = NULL;
1304 struct bt_notification_iterator_next_return next_return = {
1305 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
1306 .notification = NULL,
1307 };
1308 enum bt_notification_iterator_status status =
1309 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1310 int ret;
1311
1312 assert(iterator);
1313
1314 if (iterator->queue->length > 0) {
1315 /* We already have enough */
1316 goto end;
1317 }
1318
1319 switch (iterator->state) {
1320 case BT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED:
1321 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
1322 goto end;
1323 case BT_NOTIFICATION_ITERATOR_STATE_ENDED:
1324 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
1325 goto end;
1326 default:
1327 break;
1328 }
1329
1330 assert(iterator->upstream_component);
1331 assert(iterator->upstream_component->class);
1332
1333 /* Pick the appropriate "next" method */
1334 switch (iterator->upstream_component->class->type) {
1335 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1336 {
1337 struct bt_component_class_source *source_class =
1338 container_of(iterator->upstream_component->class,
1339 struct bt_component_class_source, parent);
1340
1341 assert(source_class->methods.iterator.next);
1342 next_method = source_class->methods.iterator.next;
1343 break;
1344 }
1345 case BT_COMPONENT_CLASS_TYPE_FILTER:
1346 {
1347 struct bt_component_class_filter *filter_class =
1348 container_of(iterator->upstream_component->class,
1349 struct bt_component_class_filter, parent);
1350
1351 assert(filter_class->methods.iterator.next);
1352 next_method = filter_class->methods.iterator.next;
1353 break;
1354 }
1355 default:
1356 assert(BT_FALSE);
1357 break;
1358 }
1359
1360 /*
1361 * Call the user's "next" method to get the next notification
1362 * and status.
1363 */
1364 assert(next_method);
1365
1366 while (iterator->queue->length == 0) {
1367 next_return = next_method(priv_iterator);
1368 if (next_return.status < 0) {
1369 status = next_return.status;
1370 goto end;
1371 }
1372
1373 switch (next_return.status) {
1374 case BT_NOTIFICATION_ITERATOR_STATUS_END:
1375 ret = handle_end(iterator);
1376 if (ret) {
1377 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1378 goto end;
1379 }
1380
1381 if (iterator->state == BT_NOTIFICATION_ITERATOR_STATE_FINALIZED) {
1382 iterator->state =
1383 BT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED;
1384
1385 if (iterator->queue->length == 0) {
1386 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
1387 }
1388 } else {
1389 iterator->state =
1390 BT_NOTIFICATION_ITERATOR_STATE_ENDED;
1391
1392 if (iterator->queue->length == 0) {
1393 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
1394 }
1395 }
1396 goto end;
1397 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
1398 status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
1399 goto end;
1400 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
1401 if (!next_return.notification) {
1402 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1403 goto end;
1404 }
1405
1406 /*
1407 * We know the notification is valid. Before we
1408 * push it to the head of the queue, push the
1409 * appropriate automatic notifications if any.
1410 */
1411 ret = enqueue_notification_and_automatic(iterator,
1412 next_return.notification);
1413 BT_PUT(next_return.notification);
1414 if (ret) {
1415 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1416 goto end;
1417 }
1418 break;
1419 default:
1420 /* Unknown non-error status */
1421 assert(BT_FALSE);
1422 }
1423 }
1424
1425 end:
1426 return status;
1427 }
1428
1429 enum bt_notification_iterator_status
1430 bt_notification_iterator_next(struct bt_notification_iterator *iterator)
1431 {
1432 enum bt_notification_iterator_status status;
1433
1434 if (!iterator) {
1435 status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
1436 goto end;
1437 }
1438
1439 /*
1440 * Make sure that the iterator's queue contains at least one
1441 * notification.
1442 */
1443 status = ensure_queue_has_notifications(iterator);
1444 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1445 goto end;
1446 }
1447
1448 /*
1449 * Move the notification at the tail of the queue to the
1450 * iterator's current notification.
1451 */
1452 assert(iterator->queue->length > 0);
1453 bt_put(iterator->current_notification);
1454 iterator->current_notification = g_queue_pop_tail(iterator->queue);
1455 assert(iterator->current_notification);
1456
1457 end:
1458 return status;
1459 }
1460
1461 struct bt_component *bt_notification_iterator_get_component(
1462 struct bt_notification_iterator *iterator)
1463 {
1464 return bt_get(iterator->upstream_component);
1465 }
1466
1467 struct bt_private_component *
1468 bt_private_notification_iterator_get_private_component(
1469 struct bt_private_notification_iterator *private_iterator)
1470 {
1471 return bt_private_component_from_component(
1472 bt_notification_iterator_get_component(
1473 bt_notification_iterator_from_private(private_iterator)));
1474 }
1475
1476 enum bt_notification_iterator_status bt_notification_iterator_seek_time(
1477 struct bt_notification_iterator *iterator,
1478 enum bt_notification_iterator_seek_origin seek_origin,
1479 int64_t time)
1480 {
1481 enum bt_notification_iterator_status ret =
1482 BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
1483 return ret;
1484 }
This page took 0.060104 seconds and 5 git commands to generate.