Subscribe to notifications when creating a notif. iterator
[babeltrace.git] / lib / graph / iterator.c
CommitLineData
47e5a032
JG
1/*
2 * iterator.c
3 *
4 * Babeltrace Notification Iterator
5 *
6 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3230ee6b 7 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
47e5a032
JG
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining a copy
10 * of this software and associated documentation files (the "Software"), to deal
11 * in the Software without restriction, including without limitation the rights
12 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 * copies of the Software, and to permit persons to whom the Software is
14 * furnished to do so, subject to the following conditions:
15 *
16 * The above copyright notice and this permission notice shall be included in
17 * all copies or substantial portions of the Software.
18 *
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 * SOFTWARE.
26 */
27
3d9990ac 28#include <babeltrace/compiler-internal.h>
b8a06801 29#include <babeltrace/ref.h>
3230ee6b
PP
30#include <babeltrace/ctf-ir/event-internal.h>
31#include <babeltrace/ctf-ir/packet-internal.h>
32#include <babeltrace/ctf-ir/stream-internal.h>
b2e0c907
PP
33#include <babeltrace/graph/component.h>
34#include <babeltrace/graph/component-source-internal.h>
35#include <babeltrace/graph/component-class-internal.h>
fa054faf 36#include <babeltrace/graph/notification.h>
b2e0c907
PP
37#include <babeltrace/graph/notification-iterator.h>
38#include <babeltrace/graph/notification-iterator-internal.h>
e7fa96c3 39#include <babeltrace/graph/notification-internal.h>
3230ee6b
PP
40#include <babeltrace/graph/notification-event.h>
41#include <babeltrace/graph/notification-event-internal.h>
42#include <babeltrace/graph/notification-packet.h>
43#include <babeltrace/graph/notification-packet-internal.h>
44#include <babeltrace/graph/notification-stream.h>
45#include <babeltrace/graph/notification-stream-internal.h>
46#include <babeltrace/graph/port.h>
fa054faf 47#include <stdint.h>
3230ee6b
PP
48
49struct stream_state {
50 struct bt_ctf_stream *stream; /* owned by this */
51 struct bt_ctf_packet *cur_packet; /* owned by this */
52 bool is_ended;
53};
54
55enum action_type {
56 ACTION_TYPE_PUSH_NOTIF,
57 ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
58 ACTION_TYPE_ADD_STREAM_STATE,
59 ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
60 ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
61};
62
63struct action {
64 enum action_type type;
65 union {
66 /* ACTION_TYPE_PUSH_NOTIF */
67 struct {
68 struct bt_notification *notif; /* owned by this */
69 } push_notif;
70
71 /* ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM */
72 struct {
73 struct bt_ctf_stream *stream; /* owned by this */
74 struct bt_component *component; /* owned by this */
75 struct bt_port *port; /* owned by this */
76 } map_port_to_comp_in_stream;
77
78 /* ACTION_TYPE_ADD_STREAM_STATE */
79 struct {
80 struct bt_ctf_stream *stream; /* owned by this */
81 struct stream_state *stream_state; /* owned by this */
82 } add_stream_state;
83
84 /* ACTION_TYPE_SET_STREAM_STATE_IS_ENDED */
85 struct {
86 struct stream_state *stream_state; /* weak */
87 } set_stream_state_is_ended;
88
89 /* ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET */
90 struct {
91 struct stream_state *stream_state; /* weak */
92 struct bt_ctf_packet *packet; /* owned by this */
93 } set_stream_state_cur_packet;
94 } payload;
95};
96
97static
98void stream_destroy_listener(struct bt_ctf_stream *stream, void *data)
99{
100 struct bt_notification_iterator *iterator = data;
101
102 /* Remove associated stream state */
103 g_hash_table_remove(iterator->stream_states, stream);
104}
105
106static
107void destroy_stream_state(struct stream_state *stream_state)
108{
109 if (!stream_state) {
110 return;
111 }
112
113 bt_put(stream_state->cur_packet);
114 bt_put(stream_state->stream);
115 g_free(stream_state);
116}
117
118static
119void destroy_action(struct action *action)
120{
121 assert(action);
122
123 switch (action->type) {
124 case ACTION_TYPE_PUSH_NOTIF:
125 BT_PUT(action->payload.push_notif.notif);
126 break;
127 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
128 BT_PUT(action->payload.map_port_to_comp_in_stream.stream);
129 BT_PUT(action->payload.map_port_to_comp_in_stream.component);
130 BT_PUT(action->payload.map_port_to_comp_in_stream.port);
131 break;
132 case ACTION_TYPE_ADD_STREAM_STATE:
133 BT_PUT(action->payload.add_stream_state.stream);
134 destroy_stream_state(
135 action->payload.add_stream_state.stream_state);
136 action->payload.add_stream_state.stream_state = NULL;
137 break;
138 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
139 BT_PUT(action->payload.set_stream_state_cur_packet.packet);
140 break;
141 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
142 break;
143 default:
144 assert(false);
145 }
146}
147
148static
149void add_action(struct bt_notification_iterator *iterator,
150 struct action *action)
151{
152 g_array_append_val(iterator->actions, *action);
153}
154
155static
156void clear_actions(struct bt_notification_iterator *iterator)
157{
158 size_t i;
159
160 for (i = 0; i < iterator->actions->len; i++) {
161 struct action *action = &g_array_index(iterator->actions,
162 struct action, i);
163
164 destroy_action(action);
165 }
166
167 g_array_set_size(iterator->actions, 0);
168}
169
170static
171void apply_actions(struct bt_notification_iterator *iterator)
172{
173 size_t i;
174
175 for (i = 0; i < iterator->actions->len; i++) {
176 struct action *action = &g_array_index(iterator->actions,
177 struct action, i);
178
179 switch (action->type) {
180 case ACTION_TYPE_PUSH_NOTIF:
181 /* Move notification to queue */
182 g_queue_push_head(iterator->queue,
183 action->payload.push_notif.notif);
184 bt_notification_freeze(
185 action->payload.push_notif.notif);
186 action->payload.push_notif.notif = NULL;
187 break;
188 case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
189 bt_ctf_stream_map_component_to_port(
190 action->payload.map_port_to_comp_in_stream.stream,
191 action->payload.map_port_to_comp_in_stream.component,
192 action->payload.map_port_to_comp_in_stream.port);
193 break;
194 case ACTION_TYPE_ADD_STREAM_STATE:
195 /* Move stream state to hash table */
196 g_hash_table_insert(iterator->stream_states,
197 action->payload.add_stream_state.stream,
198 action->payload.add_stream_state.stream_state);
199
200 action->payload.add_stream_state.stream_state = NULL;
201 break;
202 case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
203 /*
204 * We know that this stream is ended. We need to
205 * remember this as long as the stream exists to
206 * enforce that the same stream does not end
207 * twice.
208 *
209 * Here we add a destroy listener to the stream
210 * which we put after (becomes weak as the hash
211 * table key). If we were the last object to own
212 * this stream, the destroy listener is called
213 * when we call bt_put() which removes this
214 * stream state completely. This is important
215 * because the memory used by this stream object
216 * could be reused for another stream, and they
217 * must have different states.
218 */
219 bt_ctf_stream_add_destroy_listener(
220 action->payload.set_stream_state_is_ended.stream_state->stream,
221 stream_destroy_listener, iterator);
222 action->payload.set_stream_state_is_ended.stream_state->is_ended = true;
223 BT_PUT(action->payload.set_stream_state_is_ended.stream_state->stream);
224 break;
225 case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
226 /* Move packet to stream state's current packet */
227 BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet,
228 action->payload.set_stream_state_cur_packet.packet);
229 break;
230 default:
231 assert(false);
232 }
233 }
234
235 clear_actions(iterator);
236}
237
238static
239struct stream_state *create_stream_state(struct bt_ctf_stream *stream)
240{
241 struct stream_state *stream_state = g_new0(struct stream_state, 1);
242
243 if (!stream_state) {
244 goto end;
245 }
246
247 /*
248 * We keep a reference to the stream until we know it's ended
249 * because we need to be able to create an automatic "stream
250 * end" notification when the user's "next" method returns
251 * BT_NOTIFICATION_ITERATOR_STATUS_END.
252 *
253 * We put this reference when the stream is marked as ended.
254 */
255 stream_state->stream = bt_get(stream);
256
257end:
258 return stream_state;
259}
47e5a032
JG
260
261static
b8a06801 262void bt_notification_iterator_destroy(struct bt_object *obj)
47e5a032 263{
8738a040 264 struct bt_notification_iterator *iterator;
d3eb6e8f 265 struct bt_component_class *comp_class;
8738a040 266
b8a06801
JG
267 assert(obj);
268 iterator = container_of(obj, struct bt_notification_iterator,
269 base);
3230ee6b
PP
270 assert(iterator->upstream_component);
271 comp_class = iterator->upstream_component->class;
d3eb6e8f
PP
272
273 /* Call user-defined destroy method */
274 switch (comp_class->type) {
275 case BT_COMPONENT_CLASS_TYPE_SOURCE:
276 {
277 struct bt_component_class_source *source_class;
278
279 source_class = container_of(comp_class, struct bt_component_class_source, parent);
280
64cadc66
PP
281 if (source_class->methods.iterator.finalize) {
282 source_class->methods.iterator.finalize(
890882ef 283 bt_private_notification_iterator_from_notification_iterator(iterator));
d3eb6e8f
PP
284 }
285 break;
512ccb99 286 }
d3eb6e8f
PP
287 case BT_COMPONENT_CLASS_TYPE_FILTER:
288 {
289 struct bt_component_class_filter *filter_class;
290
291 filter_class = container_of(comp_class, struct bt_component_class_filter, parent);
292
64cadc66
PP
293 if (filter_class->methods.iterator.finalize) {
294 filter_class->methods.iterator.finalize(
890882ef 295 bt_private_notification_iterator_from_notification_iterator(iterator));
d3eb6e8f
PP
296 }
297 break;
298 }
299 default:
300 /* Unreachable */
301 assert(0);
302 }
303
3230ee6b
PP
304 if (iterator->queue) {
305 struct bt_notification *notif;
306
307 while ((notif = g_queue_pop_tail(iterator->queue))) {
308 bt_put(notif);
309 }
310
311 g_queue_free(iterator->queue);
312 }
313
314 if (iterator->stream_states) {
315 /*
316 * Remove our destroy listener from each stream which
317 * has a state in this iterator. Otherwise the destroy
318 * listener would be called with an invalid/other
319 * notification iterator object.
320 */
321 GHashTableIter ht_iter;
322 gpointer stream_gptr, stream_state_gptr;
323
324 g_hash_table_iter_init(&ht_iter, iterator->stream_states);
325
326 while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) {
327 assert(stream_gptr);
328 bt_ctf_stream_remove_destroy_listener(
329 (void *) stream_gptr, stream_destroy_listener,
330 iterator);
331 }
332
333 g_hash_table_destroy(iterator->stream_states);
334 }
335
336 if (iterator->actions) {
337 g_array_free(iterator->actions, TRUE);
338 }
339
340 bt_put(iterator->current_notification);
341 bt_put(iterator->upstream_component);
342 bt_put(iterator->upstream_port);
8738a040 343 g_free(iterator);
47e5a032
JG
344}
345
fa054faf
PP
346static
347int create_subscription_mask_from_notification_types(
348 struct bt_notification_iterator *iterator,
349 const enum bt_notification_type *notif_types)
350{
351 const enum bt_notification_type *notif_type;
352 int ret = 0;
353
354 assert(notif_types);
355 iterator->subscription_mask = 0;
356
357 for (notif_type = notif_types;
358 *notif_type != BT_NOTIFICATION_TYPE_SENTINEL;
359 notif_type++) {
360 switch (*notif_type) {
361 case BT_NOTIFICATION_TYPE_ALL:
362 iterator->subscription_mask |=
363 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT |
364 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY |
365 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN |
366 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END |
367 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN |
368 BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
369 break;
370 case BT_NOTIFICATION_TYPE_EVENT:
371 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT;
372 break;
373 case BT_NOTIFICATION_TYPE_INACTIVITY:
374 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY;
375 break;
376 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
377 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN;
378 break;
379 case BT_NOTIFICATION_TYPE_STREAM_END:
380 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END;
381 break;
382 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
383 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN;
384 break;
385 case BT_NOTIFICATION_TYPE_PACKET_END:
386 iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
387 break;
388 default:
389 ret = -1;
390 goto end;
391 }
392 }
393
394end:
395 return ret;
396}
397
47e5a032
JG
398BT_HIDDEN
399struct bt_notification_iterator *bt_notification_iterator_create(
3230ee6b 400 struct bt_component *upstream_comp,
fa054faf
PP
401 struct bt_port *upstream_port,
402 const enum bt_notification_type *notification_types)
47e5a032 403{
d3e4dcd8 404 enum bt_component_class_type type;
47e5a032
JG
405 struct bt_notification_iterator *iterator = NULL;
406
3230ee6b
PP
407 assert(upstream_comp);
408 assert(upstream_port);
fa054faf 409 assert(notification_types);
3230ee6b 410 assert(bt_port_is_connected(upstream_port));
16b7b023 411
3230ee6b 412 type = bt_component_get_class_type(upstream_comp);
16b7b023 413 switch (type) {
d3e4dcd8
PP
414 case BT_COMPONENT_CLASS_TYPE_SOURCE:
415 case BT_COMPONENT_CLASS_TYPE_FILTER:
16b7b023
JG
416 break;
417 default:
3230ee6b 418 goto error;
47e5a032
JG
419 }
420
421 iterator = g_new0(struct bt_notification_iterator, 1);
422 if (!iterator) {
3230ee6b 423 goto error;
47e5a032
JG
424 }
425
b8a06801 426 bt_object_init(iterator, bt_notification_iterator_destroy);
3230ee6b 427
fa054faf
PP
428 if (create_subscription_mask_from_notification_types(iterator,
429 notification_types)) {
430 goto error;
431 }
432
3230ee6b
PP
433 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
434 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
435 if (!iterator->stream_states) {
436 goto error;
437 }
438
439 iterator->queue = g_queue_new();
440 if (!iterator->queue) {
441 goto error;
442 }
443
444 iterator->actions = g_array_new(FALSE, FALSE, sizeof(struct action));
445 if (!iterator->actions) {
446 goto error;
447 }
448
449 iterator->upstream_component = bt_get(upstream_comp);
450 iterator->upstream_port = bt_get(upstream_port);
451 goto end;
452
453error:
454 BT_PUT(iterator);
455
47e5a032
JG
456end:
457 return iterator;
458}
459
460BT_HIDDEN
461enum bt_notification_iterator_status bt_notification_iterator_validate(
462 struct bt_notification_iterator *iterator)
463{
8738a040 464 enum bt_notification_iterator_status ret =
ea8d3e58 465 BT_NOTIFICATION_ITERATOR_STATUS_OK;
47e5a032 466
d3eb6e8f 467 if (!iterator) {
fe8ad2b6 468 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
ea8d3e58
JG
469 goto end;
470 }
ea8d3e58
JG
471end:
472 return ret;
473}
474
890882ef
PP
475void *bt_private_notification_iterator_get_user_data(
476 struct bt_private_notification_iterator *private_iterator)
ea8d3e58 477{
890882ef
PP
478 struct bt_notification_iterator *iterator =
479 bt_notification_iterator_from_private(private_iterator);
480
ea8d3e58
JG
481 return iterator ? iterator->user_data : NULL;
482}
483
484enum bt_notification_iterator_status
890882ef
PP
485bt_private_notification_iterator_set_user_data(
486 struct bt_private_notification_iterator *private_iterator,
487 void *data)
ea8d3e58
JG
488{
489 enum bt_notification_iterator_status ret =
490 BT_NOTIFICATION_ITERATOR_STATUS_OK;
890882ef
PP
491 struct bt_notification_iterator *iterator =
492 bt_notification_iterator_from_private(private_iterator);
ea8d3e58 493
d3eb6e8f 494 if (!iterator) {
fe8ad2b6 495 ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
ea8d3e58
JG
496 goto end;
497 }
498
499 iterator->user_data = data;
8738a040
JG
500end:
501 return ret;
502}
413bc2c4 503
53d45b87
JG
504struct bt_notification *bt_notification_iterator_get_notification(
505 struct bt_notification_iterator *iterator)
506{
41a2b7ae 507 struct bt_notification *notification = NULL;
d3eb6e8f 508
41a2b7ae
PP
509 if (!iterator) {
510 goto end;
d3eb6e8f 511 }
d3eb6e8f 512
41a2b7ae 513 notification = bt_get(iterator->current_notification);
d3eb6e8f 514
41a2b7ae
PP
515end:
516 return notification;
53d45b87
JG
517}
518
fa054faf
PP
519static
520enum bt_notification_iterator_notif_type
521bt_notification_iterator_notif_type_from_notif_type(
522 enum bt_notification_type notif_type)
523{
524 enum bt_notification_iterator_notif_type iter_notif_type;
525
526 switch (notif_type) {
527 case BT_NOTIFICATION_TYPE_EVENT:
528 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT;
529 break;
530 case BT_NOTIFICATION_TYPE_INACTIVITY:
531 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_INACTIVITY;
532 break;
533 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
534 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN;
535 break;
536 case BT_NOTIFICATION_TYPE_STREAM_END:
537 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END;
538 break;
539 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
540 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN;
541 break;
542 case BT_NOTIFICATION_TYPE_PACKET_END:
543 iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END;
544 break;
545 default:
546 assert(false);
547 }
548
549 return iter_notif_type;
550}
551
3230ee6b
PP
552static
553bool validate_notification(struct bt_notification_iterator *iterator,
554 struct bt_notification *notif,
555 struct bt_ctf_stream *notif_stream,
556 struct bt_ctf_packet *notif_packet)
557{
558 bool is_valid = true;
559 struct stream_state *stream_state;
560 struct bt_port *stream_comp_cur_port;
561
562 assert(notif_stream);
563 stream_comp_cur_port =
564 bt_ctf_stream_port_for_component(notif_stream,
565 iterator->upstream_component);
566 if (!stream_comp_cur_port) {
567 /*
568 * This is the first time this notification iterator
569 * bumps into this stream. Add an action to map the
570 * iterator's upstream component to the iterator's
571 * upstream port in this stream.
572 */
573 struct action action = {
574 .type = ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
575 .payload.map_port_to_comp_in_stream = {
576 .stream = bt_get(notif_stream),
577 .component = bt_get(iterator->upstream_component),
578 .port = bt_get(iterator->upstream_port),
579 },
580 };
581
582 add_action(iterator, &action);
583 } else {
584 if (stream_comp_cur_port != iterator->upstream_port) {
585 /*
586 * It looks like two different ports of the same
587 * component are emitting notifications which
588 * have references to the same stream. This is
589 * bad: the API guarantees that it can never
590 * happen.
591 */
592 is_valid = false;
593 goto end;
594 }
595
596 }
597
598 stream_state = g_hash_table_lookup(iterator->stream_states,
599 notif_stream);
600 if (stream_state) {
601 if (stream_state->is_ended) {
602 /*
603 * There's a new notification which has a
604 * reference to a stream which, from this
605 * iterator's point of view, is ended ("end of
606 * stream" notification was returned). This is
607 * bad: the API guarantees that it can never
608 * happen.
609 */
610 is_valid = false;
611 goto end;
612 }
613
614 switch (notif->type) {
615 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
616 /*
617 * We already have a stream state, which means
618 * we already returned a "stream begin"
619 * notification: this is an invalid duplicate.
620 */
621 is_valid = false;
622 goto end;
623 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
624 if (notif_packet == stream_state->cur_packet) {
625 /* Duplicate "packet begin" notification */
626 is_valid = false;
627 goto end;
628 }
629 break;
630 default:
631 break;
632 }
633 }
634
635end:
636 return is_valid;
637}
638
fa054faf
PP
639static
640bool is_subscribed_to_notification_type(struct bt_notification_iterator *iterator,
641 enum bt_notification_type notif_type)
642{
643 uint32_t iter_notif_type =
644 (uint32_t) bt_notification_iterator_notif_type_from_notif_type(
645 notif_type);
646
647 return (iter_notif_type & iterator->subscription_mask) ? true : false;
648}
649
3230ee6b
PP
650static
651void add_action_push_notif(struct bt_notification_iterator *iterator,
652 struct bt_notification *notif)
653{
654 struct action action = {
655 .type = ACTION_TYPE_PUSH_NOTIF,
3230ee6b
PP
656 };
657
658 assert(notif);
fa054faf
PP
659
660 if (!is_subscribed_to_notification_type(iterator, notif->type)) {
661 return;
662 }
663
664 action.payload.push_notif.notif = bt_get(notif);
3230ee6b
PP
665 add_action(iterator, &action);
666}
667
668static
669int add_action_push_notif_stream_begin(
670 struct bt_notification_iterator *iterator,
671 struct bt_ctf_stream *stream)
672{
673 int ret = 0;
674 struct bt_notification *stream_begin_notif = NULL;
675
fa054faf
PP
676 if (!is_subscribed_to_notification_type(iterator,
677 BT_NOTIFICATION_TYPE_STREAM_BEGIN)) {
678 goto end;
679 }
680
3230ee6b
PP
681 assert(stream);
682 stream_begin_notif = bt_notification_stream_begin_create(stream);
683 if (!stream_begin_notif) {
684 goto error;
685 }
686
687 add_action_push_notif(iterator, stream_begin_notif);
688 goto end;
689
690error:
691 ret = -1;
692
693end:
694 bt_put(stream_begin_notif);
695 return ret;
696}
697
698static
699int add_action_push_notif_stream_end(
700 struct bt_notification_iterator *iterator,
701 struct bt_ctf_stream *stream)
702{
703 int ret = 0;
704 struct bt_notification *stream_end_notif = NULL;
705
fa054faf
PP
706 if (!is_subscribed_to_notification_type(iterator,
707 BT_NOTIFICATION_TYPE_STREAM_END)) {
708 goto end;
709 }
710
3230ee6b
PP
711 assert(stream);
712 stream_end_notif = bt_notification_stream_end_create(stream);
713 if (!stream_end_notif) {
714 goto error;
715 }
716
717 add_action_push_notif(iterator, stream_end_notif);
718 goto end;
719
720error:
721 ret = -1;
722
723end:
724 bt_put(stream_end_notif);
725 return ret;
726}
727
728static
729int add_action_push_notif_packet_begin(
730 struct bt_notification_iterator *iterator,
731 struct bt_ctf_packet *packet)
732{
733 int ret = 0;
734 struct bt_notification *packet_begin_notif = NULL;
735
fa054faf
PP
736 if (!is_subscribed_to_notification_type(iterator,
737 BT_NOTIFICATION_TYPE_PACKET_BEGIN)) {
738 goto end;
739 }
740
3230ee6b
PP
741 assert(packet);
742 packet_begin_notif = bt_notification_packet_begin_create(packet);
743 if (!packet_begin_notif) {
744 goto error;
745 }
746
747 add_action_push_notif(iterator, packet_begin_notif);
748 goto end;
749
750error:
751 ret = -1;
752
753end:
754 bt_put(packet_begin_notif);
755 return ret;
756}
757
758static
759int add_action_push_notif_packet_end(
760 struct bt_notification_iterator *iterator,
761 struct bt_ctf_packet *packet)
762{
763 int ret = 0;
764 struct bt_notification *packet_end_notif = NULL;
765
fa054faf
PP
766 if (!is_subscribed_to_notification_type(iterator,
767 BT_NOTIFICATION_TYPE_PACKET_END)) {
768 goto end;
769 }
770
3230ee6b
PP
771 assert(packet);
772 packet_end_notif = bt_notification_packet_end_create(packet);
773 if (!packet_end_notif) {
774 goto error;
775 }
776
777 add_action_push_notif(iterator, packet_end_notif);
778 goto end;
779
780error:
781 ret = -1;
782
783end:
784 bt_put(packet_end_notif);
785 return ret;
786}
787
788static
789void add_action_set_stream_state_is_ended(
790 struct bt_notification_iterator *iterator,
791 struct stream_state *stream_state)
792{
793 struct action action = {
794 .type = ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
795 .payload.set_stream_state_is_ended = {
796 .stream_state = stream_state,
797 },
798 };
799
800 assert(stream_state);
801 add_action(iterator, &action);
802}
803
804static
805void add_action_set_stream_state_cur_packet(
806 struct bt_notification_iterator *iterator,
807 struct stream_state *stream_state,
808 struct bt_ctf_packet *packet)
809{
810 struct action action = {
811 .type = ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
812 .payload.set_stream_state_cur_packet = {
813 .stream_state = stream_state,
814 .packet = bt_get(packet),
815 },
816 };
817
818 assert(stream_state);
819 add_action(iterator, &action);
820}
821
822static
823int ensure_stream_state_exists(struct bt_notification_iterator *iterator,
824 struct bt_notification *stream_begin_notif,
825 struct bt_ctf_stream *notif_stream,
826 struct stream_state **stream_state)
827{
828 int ret = 0;
829
830 if (!notif_stream) {
831 /*
832 * The notification does not reference any stream: no
833 * need to get or create a stream state.
834 */
835 goto end;
836 }
837
838 *stream_state = g_hash_table_lookup(iterator->stream_states,
839 notif_stream);
840 if (!*stream_state) {
841 /*
842 * This iterator did not bump into this stream yet:
843 * create a stream state and a "stream begin"
844 * notification.
845 */
846 struct action action = {
847 .type = ACTION_TYPE_ADD_STREAM_STATE,
848 .payload.add_stream_state = {
849 .stream = bt_get(notif_stream),
850 .stream_state = NULL,
851 },
852 };
853
854 *stream_state = create_stream_state(notif_stream);
855 if (!stream_state) {
856 goto error;
857 }
858
859 action.payload.add_stream_state.stream_state =
860 *stream_state;
861 add_action(iterator, &action);
862
863 if (stream_begin_notif) {
864 add_action_push_notif(iterator, stream_begin_notif);
865 } else {
866 ret = add_action_push_notif_stream_begin(iterator,
867 notif_stream);
868 if (ret) {
869 goto error;
870 }
871 }
872 }
873
874 goto end;
875
876error:
877 destroy_stream_state(*stream_state);
878 ret = -1;
879
880end:
881 return ret;
882}
883
884static
885int handle_packet_switch(struct bt_notification_iterator *iterator,
886 struct bt_notification *packet_begin_notif,
887 struct bt_ctf_packet *new_packet,
888 struct stream_state *stream_state)
889{
890 int ret = 0;
891
892 if (stream_state->cur_packet == new_packet) {
893 goto end;
894 }
895
896 if (stream_state->cur_packet) {
897 /* End of the current packet */
898 ret = add_action_push_notif_packet_end(iterator,
899 stream_state->cur_packet);
900 if (ret) {
901 goto error;
902 }
903 }
904
905 /* Beginning of the new packet */
906 if (packet_begin_notif) {
907 add_action_push_notif(iterator, packet_begin_notif);
908 } else if (new_packet) {
909 ret = add_action_push_notif_packet_begin(iterator,
910 new_packet);
911 if (ret) {
912 goto error;
913 }
914 }
915
916 add_action_set_stream_state_cur_packet(iterator, stream_state,
917 new_packet);
918 goto end;
919
920error:
921 ret = -1;
922
923end:
924 return ret;
925}
926
927static
928int handle_notif_stream_begin(
929 struct bt_notification_iterator *iterator,
930 struct bt_notification *notif,
931 struct bt_ctf_stream *notif_stream)
932{
933 int ret = 0;
934 struct stream_state *stream_state;
935
936 assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_BEGIN);
937 assert(notif_stream);
938 ret = ensure_stream_state_exists(iterator, notif, notif_stream,
939 &stream_state);
940 if (ret) {
941 goto error;
942 }
943
944 goto end;
945
946error:
947 ret = -1;
948
949end:
950 return ret;
951}
952
953static
954int handle_notif_stream_end(
955 struct bt_notification_iterator *iterator,
956 struct bt_notification *notif,
957 struct bt_ctf_stream *notif_stream)
958{
959 int ret = 0;
960 struct stream_state *stream_state;
961
962 assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_END);
963 assert(notif_stream);
964 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
965 &stream_state);
966 if (ret) {
967 goto error;
968 }
969
970 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
971 if (ret) {
972 goto error;
973 }
974
975 add_action_push_notif(iterator, notif);
976 add_action_set_stream_state_is_ended(iterator, stream_state);
977 goto end;
978
979error:
980 ret = -1;
981
982end:
983 return ret;
984}
985
986static
987int handle_notif_packet_begin(
988 struct bt_notification_iterator *iterator,
989 struct bt_notification *notif,
990 struct bt_ctf_stream *notif_stream,
991 struct bt_ctf_packet *notif_packet)
992{
993 int ret = 0;
994 struct stream_state *stream_state;
995
996 assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_BEGIN);
997 assert(notif_packet);
998 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
999 &stream_state);
1000 if (ret) {
1001 goto error;
1002 }
1003
1004 ret = handle_packet_switch(iterator, notif, notif_packet, stream_state);
1005 if (ret) {
1006 goto error;
1007 }
1008
1009 goto end;
1010
1011error:
1012 ret = -1;
1013
1014end:
1015 return ret;
1016}
1017
1018static
1019int handle_notif_packet_end(
1020 struct bt_notification_iterator *iterator,
1021 struct bt_notification *notif,
1022 struct bt_ctf_stream *notif_stream,
1023 struct bt_ctf_packet *notif_packet)
1024{
1025 int ret = 0;
1026 struct stream_state *stream_state;
1027
1028 assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_END);
1029 assert(notif_packet);
1030 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1031 &stream_state);
1032 if (ret) {
1033 goto error;
1034 }
1035
1036 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
1037 if (ret) {
1038 goto error;
1039 }
1040
1041 /* End of the current packet */
1042 add_action_push_notif(iterator, notif);
1043 add_action_set_stream_state_cur_packet(iterator, stream_state, NULL);
1044 goto end;
1045
1046error:
1047 ret = -1;
1048
1049end:
1050 return ret;
1051}
1052
1053static
1054int handle_notif_event(
1055 struct bt_notification_iterator *iterator,
1056 struct bt_notification *notif,
1057 struct bt_ctf_stream *notif_stream,
1058 struct bt_ctf_packet *notif_packet)
1059{
1060 int ret = 0;
1061 struct stream_state *stream_state;
1062
1063 assert(notif->type == BT_NOTIFICATION_TYPE_EVENT);
1064 assert(notif_packet);
1065 ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
1066 &stream_state);
1067 if (ret) {
1068 goto error;
1069 }
1070
1071 ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
1072 if (ret) {
1073 goto error;
1074 }
1075
1076 add_action_push_notif(iterator, notif);
1077 goto end;
1078
1079error:
1080 ret = -1;
1081
1082end:
1083 return ret;
1084}
1085
1086static
1087int enqueue_notification_and_automatic(
1088 struct bt_notification_iterator *iterator,
1089 struct bt_notification *notif)
1090{
1091 int ret = 0;
1092 struct bt_ctf_event *notif_event = NULL;
1093 struct bt_ctf_stream *notif_stream = NULL;
1094 struct bt_ctf_packet *notif_packet = NULL;
1095
1096 assert(notif);
1097
fa054faf
PP
1098 // TODO: Skip most of this if the iterator is only subscribed
1099 // to event/inactivity notifications.
1100
3230ee6b
PP
1101 /* Get the stream and packet referred by the notification */
1102 switch (notif->type) {
1103 case BT_NOTIFICATION_TYPE_EVENT:
1104 notif_event = bt_notification_event_borrow_event(notif);
1105 assert(notif_event);
1106 notif_packet = bt_ctf_event_borrow_packet(notif_event);
1107 assert(notif_packet);
1108 break;
1109 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1110 notif_stream =
1111 bt_notification_stream_begin_borrow_stream(notif);
1112 assert(notif_stream);
1113 break;
1114 case BT_NOTIFICATION_TYPE_STREAM_END:
1115 notif_stream = bt_notification_stream_end_borrow_stream(notif);
1116 assert(notif_stream);
1117 break;
1118 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1119 notif_packet =
1120 bt_notification_packet_begin_borrow_packet(notif);
1121 assert(notif_packet);
1122 break;
1123 case BT_NOTIFICATION_TYPE_PACKET_END:
1124 notif_packet = bt_notification_packet_end_borrow_packet(notif);
1125 assert(notif_packet);
1126 break;
1127 case BT_NOTIFICATION_TYPE_INACTIVITY:
1128 /* Always valid */
1129 break;
1130 default:
1131 /*
1132 * Invalid type of notification. Only the notification
1133 * types above are allowed to be returned by a user
1134 * component.
1135 */
1136 goto error;
1137 }
1138
1139 if (notif_packet) {
1140 notif_stream = bt_ctf_packet_borrow_stream(notif_packet);
1141 assert(notif_stream);
1142 }
1143
1144 if (!notif_stream) {
1145 /*
1146 * The notification has no reference to a stream: it
1147 * cannot cause the creation of automatic notifications.
1148 */
1149 goto end;
1150 }
1151
1152 if (!validate_notification(iterator, notif, notif_stream,
1153 notif_packet)) {
1154 goto error;
1155 }
1156
1157 switch (notif->type) {
1158 case BT_NOTIFICATION_TYPE_EVENT:
1159 ret = handle_notif_event(iterator, notif, notif_stream,
1160 notif_packet);
1161 break;
1162 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
1163 ret = handle_notif_stream_begin(iterator, notif, notif_stream);
1164 break;
1165 case BT_NOTIFICATION_TYPE_STREAM_END:
1166 ret = handle_notif_stream_end(iterator, notif, notif_stream);
1167 break;
1168 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
1169 ret = handle_notif_packet_begin(iterator, notif, notif_stream,
1170 notif_packet);
1171 break;
1172 case BT_NOTIFICATION_TYPE_PACKET_END:
1173 ret = handle_notif_packet_end(iterator, notif, notif_stream,
1174 notif_packet);
1175 break;
1176 default:
1177 break;
1178 }
1179
1180 if (ret) {
1181 goto error;
1182 }
1183
1184 apply_actions(iterator);
1185 goto end;
1186
1187error:
1188 ret = -1;
1189
1190end:
1191 return ret;
1192}
1193
1194static
1195int handle_end(struct bt_notification_iterator *iterator)
1196{
1197 GHashTableIter stream_state_iter;
1198 gpointer stream_gptr, stream_state_gptr;
1199 int ret = 0;
1200
1201 /*
1202 * Emit a "stream end" notification for each non-ended stream
1203 * known by this iterator and mark them as ended.
1204 */
1205 g_hash_table_iter_init(&stream_state_iter, iterator->stream_states);
1206
1207 while (g_hash_table_iter_next(&stream_state_iter, &stream_gptr,
1208 &stream_state_gptr)) {
1209 struct stream_state *stream_state = stream_state_gptr;
1210
1211 assert(stream_state_gptr);
1212
1213 if (stream_state->is_ended) {
1214 continue;
1215 }
1216
1217 ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
1218 if (ret) {
1219 goto error;
1220 }
1221
1222 ret = add_action_push_notif_stream_end(iterator, stream_gptr);
1223 if (ret) {
1224 goto error;
1225 }
1226
1227 add_action_set_stream_state_is_ended(iterator, stream_state);
1228 }
1229
1230 apply_actions(iterator);
1231 goto end;
1232
1233error:
1234 ret = -1;
1235
1236end:
1237 return ret;
1238}
1239
1240static
1241enum bt_notification_iterator_status ensure_queue_has_notifications(
1242 struct bt_notification_iterator *iterator)
53d45b87 1243{
890882ef
PP
1244 struct bt_private_notification_iterator *priv_iterator =
1245 bt_private_notification_iterator_from_notification_iterator(iterator);
d3eb6e8f 1246 bt_component_class_notification_iterator_next_method next_method = NULL;
fe8ad2b6
PP
1247 struct bt_notification_iterator_next_return next_return = {
1248 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
1249 .notification = NULL,
1250 };
3230ee6b
PP
1251 enum bt_notification_iterator_status status =
1252 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1253 int ret;
41a2b7ae 1254
3230ee6b
PP
1255 assert(iterator);
1256
1257 if (iterator->queue->length > 0) {
1258 /* We already have enough */
1259 goto end;
1260 }
1261
1262 if (iterator->is_ended) {
1263 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
41a2b7ae
PP
1264 goto end;
1265 }
d3eb6e8f 1266
3230ee6b
PP
1267 assert(iterator->upstream_component);
1268 assert(iterator->upstream_component->class);
d3eb6e8f 1269
3230ee6b
PP
1270 /* Pick the appropriate "next" method */
1271 switch (iterator->upstream_component->class->type) {
d3eb6e8f
PP
1272 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1273 {
1274 struct bt_component_class_source *source_class =
3230ee6b 1275 container_of(iterator->upstream_component->class,
d3eb6e8f
PP
1276 struct bt_component_class_source, parent);
1277
1278 assert(source_class->methods.iterator.next);
1279 next_method = source_class->methods.iterator.next;
1280 break;
1281 }
1282 case BT_COMPONENT_CLASS_TYPE_FILTER:
1283 {
1284 struct bt_component_class_filter *filter_class =
3230ee6b 1285 container_of(iterator->upstream_component->class,
d3eb6e8f
PP
1286 struct bt_component_class_filter, parent);
1287
1288 assert(filter_class->methods.iterator.next);
1289 next_method = filter_class->methods.iterator.next;
1290 break;
1291 }
1292 default:
1293 assert(false);
1294 break;
1295 }
1296
3230ee6b
PP
1297 /*
1298 * Call the user's "next" method to get the next notification
fa054faf 1299 * and status.
3230ee6b 1300 */
d3eb6e8f 1301 assert(next_method);
3230ee6b 1302
fa054faf
PP
1303 while (iterator->queue->length == 0) {
1304 next_return = next_method(priv_iterator);
1305 if (next_return.status < 0) {
1306 status = next_return.status;
3230ee6b
PP
1307 goto end;
1308 }
1309
fa054faf
PP
1310 switch (next_return.status) {
1311 case BT_NOTIFICATION_ITERATOR_STATUS_END:
1312 ret = handle_end(iterator);
1313 if (ret) {
1314 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1315 goto end;
1316 }
3230ee6b 1317
fa054faf
PP
1318 if (iterator->queue->length == 0) {
1319 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
1320 }
41a2b7ae 1321
fa054faf 1322 iterator->is_ended = true;
3230ee6b 1323 goto end;
fa054faf
PP
1324 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
1325 status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
1326 goto end;
1327 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
1328 if (!next_return.notification) {
1329 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1330 goto end;
1331 }
1332
1333 /*
1334 * We know the notification is valid. Before we
1335 * push it to the head of the queue, push the
1336 * appropriate automatic notifications if any.
1337 */
1338 ret = enqueue_notification_and_automatic(iterator,
1339 next_return.notification);
1340 BT_PUT(next_return.notification);
1341 if (ret) {
1342 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1343 goto end;
1344 }
1345 break;
1346 default:
1347 /* Unknown non-error status */
1348 assert(false);
3230ee6b 1349 }
41a2b7ae
PP
1350 }
1351
1352end:
3230ee6b
PP
1353 return status;
1354}
1355
1356enum bt_notification_iterator_status
1357bt_notification_iterator_next(struct bt_notification_iterator *iterator)
1358{
1359 enum bt_notification_iterator_status status;
1360
1361 if (!iterator) {
1362 status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
1363 goto end;
1364 }
1365
1366 /*
1367 * Make sure that the iterator's queue contains at least one
1368 * notification.
1369 */
1370 status = ensure_queue_has_notifications(iterator);
1371 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1372 goto end;
1373 }
1374
1375 /*
1376 * Move the notification at the tail of the queue to the
1377 * iterator's current notification.
1378 */
1379 assert(iterator->queue->length > 0);
1380 bt_put(iterator->current_notification);
1381 iterator->current_notification = g_queue_pop_tail(iterator->queue);
1382 assert(iterator->current_notification);
1383
1384end:
1385 return status;
53d45b87
JG
1386}
1387
413bc2c4
JG
1388struct bt_component *bt_notification_iterator_get_component(
1389 struct bt_notification_iterator *iterator)
1390{
3230ee6b 1391 return bt_get(iterator->upstream_component);
413bc2c4
JG
1392}
1393
91457551
PP
1394struct bt_private_component *
1395bt_private_notification_iterator_get_private_component(
1396 struct bt_private_notification_iterator *private_iterator)
1397{
1398 return bt_private_component_from_component(
1399 bt_notification_iterator_get_component(
1400 bt_notification_iterator_from_private(private_iterator)));
1401}
1402
9531634f
JG
1403enum bt_notification_iterator_status bt_notification_iterator_seek_time(
1404 struct bt_notification_iterator *iterator,
1405 enum bt_notification_iterator_seek_origin seek_origin,
1406 int64_t time)
1407{
b7726e32
MD
1408 enum bt_notification_iterator_status ret =
1409 BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
9531634f
JG
1410 return ret;
1411}
This page took 0.084312 seconds and 4 git commands to generate.