ffd2de5a28fa01b9e4ece401e307238b47463e64
[babeltrace.git] / lib / graph / iterator.c
1 /*
2 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_TAG "NOTIF-ITER"
25 #include <babeltrace/lib-logging-internal.h>
26
27 #include <babeltrace/compiler-internal.h>
28 #include <babeltrace/object.h>
29 #include <babeltrace/trace-ir/fields.h>
30 #include <babeltrace/trace-ir/event-internal.h>
31 #include <babeltrace/trace-ir/packet-internal.h>
32 #include <babeltrace/trace-ir/stream-internal.h>
33 #include <babeltrace/graph/connection.h>
34 #include <babeltrace/graph/connection-internal.h>
35 #include <babeltrace/graph/component.h>
36 #include <babeltrace/graph/component-internal.h>
37 #include <babeltrace/graph/component-source-internal.h>
38 #include <babeltrace/graph/component-class-internal.h>
39 #include <babeltrace/graph/component-class-sink-colander-internal.h>
40 #include <babeltrace/graph/component-sink.h>
41 #include <babeltrace/graph/notification.h>
42 #include <babeltrace/graph/notification-iterator.h>
43 #include <babeltrace/graph/notification-iterator-internal.h>
44 #include <babeltrace/graph/self-component-port-input-notification-iterator.h>
45 #include <babeltrace/graph/port-output-notification-iterator.h>
46 #include <babeltrace/graph/notification-internal.h>
47 #include <babeltrace/graph/notification-event.h>
48 #include <babeltrace/graph/notification-event-internal.h>
49 #include <babeltrace/graph/notification-packet.h>
50 #include <babeltrace/graph/notification-packet-internal.h>
51 #include <babeltrace/graph/notification-stream.h>
52 #include <babeltrace/graph/notification-stream-internal.h>
53 #include <babeltrace/graph/port.h>
54 #include <babeltrace/graph/private-graph.h>
55 #include <babeltrace/graph/graph-internal.h>
56 #include <babeltrace/types.h>
57 #include <babeltrace/assert-internal.h>
58 #include <babeltrace/assert-pre-internal.h>
59 #include <stdint.h>
60 #include <inttypes.h>
61 #include <stdlib.h>
62
63 /*
64 * TODO: Use graph's state (number of active iterators, etc.) and
65 * possibly system specifications to make a better guess than this.
66 */
67 #define NOTIF_BATCH_SIZE 15
68
69 struct stream_state {
70 struct bt_stream *stream; /* owned by this */
71 struct bt_packet *cur_packet; /* owned by this */
72 uint64_t expected_notif_seq_num;
73 bt_bool is_ended;
74 };
75
76 BT_ASSERT_PRE_FUNC
77 static
78 void destroy_stream_state(struct stream_state *stream_state)
79 {
80 if (!stream_state) {
81 return;
82 }
83
84 BT_LOGV("Destroying stream state: stream-state-addr=%p", stream_state);
85 BT_LOGV_STR("Putting stream state's current packet.");
86 BT_OBJECT_PUT_REF_AND_RESET(stream_state->cur_packet);
87 BT_LOGV_STR("Putting stream state's stream.");
88 BT_OBJECT_PUT_REF_AND_RESET(stream_state->stream);
89 g_free(stream_state);
90 }
91
92 BT_ASSERT_PRE_FUNC
93 static
94 struct stream_state *create_stream_state(struct bt_stream *stream)
95 {
96 struct stream_state *stream_state = g_new0(struct stream_state, 1);
97
98 if (!stream_state) {
99 BT_LOGE_STR("Failed to allocate one stream state.");
100 goto end;
101 }
102
103 /*
104 * We keep a reference to the stream until we know it's ended.
105 */
106 stream_state->stream = stream;
107 bt_object_get_no_null_check(stream_state->stream);
108 BT_LIB_LOGV("Created stream state: %![stream-]+s, "
109 "stream-state-addr=%p",
110 stream, stream_state);
111
112 end:
113 return stream_state;
114 }
115
116 static
117 void destroy_base_notification_iterator(struct bt_object *obj)
118 {
119 struct bt_notification_iterator *iterator = (void *) obj;
120
121 BT_ASSERT(iterator);
122
123 if (iterator->notifs) {
124 g_ptr_array_free(iterator->notifs, TRUE);
125 iterator->notifs = NULL;
126 }
127
128 g_free(iterator);
129 }
130
131 static
132 void bt_self_component_port_input_notification_iterator_destroy(struct bt_object *obj)
133 {
134 struct bt_self_component_port_input_notification_iterator *iterator;
135
136 BT_ASSERT(obj);
137
138 /*
139 * The notification iterator's reference count is 0 if we're
140 * here. Increment it to avoid a double-destroy (possibly
141 * infinitely recursive). This could happen for example if the
142 * notification iterator's finalization function does
143 * bt_object_get_ref() (or anything that causes
144 * bt_object_get_ref() to be called) on itself (ref. count goes
145 * from 0 to 1), and then bt_object_put_ref(): the reference
146 * count would go from 1 to 0 again and this function would be
147 * called again.
148 */
149 obj->ref_count++;
150 iterator = (void *) obj;
151 BT_LIB_LOGD("Destroying self component input port notification iterator object: "
152 "%!+i", iterator);
153 bt_self_component_port_input_notification_iterator_finalize(iterator);
154
155 if (iterator->stream_states) {
156 /*
157 * Remove our destroy listener from each stream which
158 * has a state in this iterator. Otherwise the destroy
159 * listener would be called with an invalid/other
160 * notification iterator object.
161 */
162 g_hash_table_destroy(iterator->stream_states);
163 iterator->stream_states = NULL;
164 }
165
166 if (iterator->connection) {
167 /*
168 * Remove ourself from the originating connection so
169 * that it does not try to finalize a dangling pointer
170 * later.
171 */
172 bt_connection_remove_iterator(iterator->connection, iterator);
173 iterator->connection = NULL;
174 }
175
176 destroy_base_notification_iterator(obj);
177 }
178
179 BT_HIDDEN
180 void bt_self_component_port_input_notification_iterator_finalize(
181 struct bt_self_component_port_input_notification_iterator *iterator)
182 {
183 typedef void (*method_t)(void *);
184
185 struct bt_component_class *comp_class = NULL;
186 method_t method = NULL;
187
188 BT_ASSERT(iterator);
189
190 switch (iterator->state) {
191 case BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_NON_INITIALIZED:
192 /* Skip user finalization if user initialization failed */
193 BT_LIB_LOGD("Not finalizing non-initialized notification iterator: "
194 "%!+i", iterator);
195 return;
196 case BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_FINALIZED:
197 case BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED:
198 /* Already finalized */
199 BT_LIB_LOGD("Not finalizing notification iterator: already finalized: "
200 "%!+i", iterator);
201 return;
202 default:
203 break;
204 }
205
206 BT_LIB_LOGD("Finalizing notification iterator: %!+i", iterator);
207
208 if (iterator->state == BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_ENDED) {
209 BT_LIB_LOGD("Updating notification iterator's state: "
210 "new-state=BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED");
211 iterator->state = BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED;
212 } else {
213 BT_LIB_LOGD("Updating notification iterator's state: "
214 "new-state=BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_FINALIZED");
215 iterator->state = BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_FINALIZED;
216 }
217
218 BT_ASSERT(iterator->upstream_component);
219 comp_class = iterator->upstream_component->class;
220
221 /* Call user-defined destroy method */
222 switch (comp_class->type) {
223 case BT_COMPONENT_CLASS_TYPE_SOURCE:
224 {
225 struct bt_component_class_source *src_comp_cls =
226 (void *) comp_class;
227
228 method = (method_t) src_comp_cls->methods.notif_iter_finalize;
229 break;
230 }
231 case BT_COMPONENT_CLASS_TYPE_FILTER:
232 {
233 struct bt_component_class_filter *flt_comp_cls =
234 (void *) comp_class;
235
236 method = (method_t) flt_comp_cls->methods.notif_iter_finalize;
237 break;
238 }
239 default:
240 /* Unreachable */
241 abort();
242 }
243
244 if (method) {
245 BT_LIB_LOGD("Calling user's finalization method: %!+i",
246 iterator);
247 method(iterator);
248 }
249
250 iterator->upstream_component = NULL;
251 iterator->upstream_port = NULL;
252 BT_LIB_LOGD("Finalized notification iterator: %!+i", iterator);
253 }
254
255 BT_HIDDEN
256 void bt_self_component_port_input_notification_iterator_set_connection(
257 struct bt_self_component_port_input_notification_iterator *iterator,
258 struct bt_connection *connection)
259 {
260 BT_ASSERT(iterator);
261 iterator->connection = connection;
262 BT_LIB_LOGV("Set notification iterator's connection: "
263 "%![iter-]+i, %![conn-]+x", iterator, connection);
264 }
265
266 static
267 int init_notification_iterator(struct bt_notification_iterator *iterator,
268 enum bt_notification_iterator_type type,
269 bt_object_release_func destroy)
270 {
271 int ret = 0;
272
273 bt_object_init_shared(&iterator->base, destroy);
274 iterator->type = type;
275 iterator->notifs = g_ptr_array_new();
276 if (!iterator->notifs) {
277 BT_LOGE_STR("Failed to allocate a GPtrArray.");
278 ret = -1;
279 goto end;
280 }
281
282 g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE);
283
284 end:
285 return ret;
286 }
287
288 static
289 struct bt_self_component_port_input_notification_iterator *
290 bt_self_component_port_input_notification_iterator_create_initial(
291 struct bt_component *upstream_comp,
292 struct bt_port *upstream_port)
293 {
294 int ret;
295 struct bt_self_component_port_input_notification_iterator *iterator = NULL;
296
297 BT_ASSERT(upstream_comp);
298 BT_ASSERT(upstream_port);
299 BT_ASSERT(bt_port_is_connected(upstream_port));
300 BT_LIB_LOGD("Creating initial notification iterator on self component input port: "
301 "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port);
302 BT_ASSERT(bt_component_get_class_type(upstream_comp) ==
303 BT_COMPONENT_CLASS_TYPE_SOURCE ||
304 bt_component_get_class_type(upstream_comp) ==
305 BT_COMPONENT_CLASS_TYPE_FILTER);
306 iterator = g_new0(
307 struct bt_self_component_port_input_notification_iterator, 1);
308 if (!iterator) {
309 BT_LOGE_STR("Failed to allocate one self component input port "
310 "notification iterator.");
311 goto end;
312 }
313
314 ret = init_notification_iterator((void *) iterator,
315 BT_NOTIFICATION_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT,
316 bt_self_component_port_input_notification_iterator_destroy);
317 if (ret) {
318 /* init_notification_iterator() logs errors */
319 BT_OBJECT_PUT_REF_AND_RESET(iterator);
320 goto end;
321 }
322
323 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
324 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
325 if (!iterator->stream_states) {
326 BT_LOGE_STR("Failed to allocate a GHashTable.");
327 BT_OBJECT_PUT_REF_AND_RESET(iterator);
328 goto end;
329 }
330
331 iterator->upstream_component = upstream_comp;
332 iterator->upstream_port = upstream_port;
333 iterator->connection = iterator->upstream_port->connection;
334 iterator->graph = bt_component_borrow_graph(upstream_comp);
335 iterator->state = BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_NON_INITIALIZED;
336 BT_LIB_LOGD("Created initial notification iterator on self component input port: "
337 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
338 upstream_port, upstream_comp, iterator);
339
340 end:
341 return iterator;
342 }
343
344 struct bt_self_component_port_input_notification_iterator *
345 bt_self_component_port_input_notification_iterator_create(
346 struct bt_self_component_port_input *self_port)
347 {
348 typedef enum bt_self_notification_iterator_status (*init_method_t)(
349 void *, void *, void *);
350
351 init_method_t init_method = NULL;
352 struct bt_self_component_port_input_notification_iterator *iterator =
353 NULL;
354 struct bt_port *port = (void *) self_port;
355 struct bt_port *upstream_port;
356 struct bt_component *comp;
357 struct bt_component *upstream_comp;
358 struct bt_component_class *upstream_comp_cls;
359
360 BT_ASSERT_PRE_NON_NULL(port, "Port");
361 comp = bt_port_borrow_component(port);
362 BT_ASSERT_PRE(bt_port_is_connected(port),
363 "Port is not connected: %![port-]+p", port);
364 BT_ASSERT_PRE(comp, "Port is not part of a component: %![port-]+p",
365 port);
366 BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp),
367 "Port's component's graph is canceled: "
368 "%![port-]+p, %![comp-]+c", port, comp);
369 BT_ASSERT(port->connection);
370 upstream_port = port->connection->upstream_port;
371 BT_ASSERT(upstream_port);
372 upstream_comp = bt_port_borrow_component(upstream_port);
373 BT_ASSERT(upstream_comp);
374 upstream_comp_cls = upstream_comp->class;
375 BT_ASSERT(upstream_comp->class->type ==
376 BT_COMPONENT_CLASS_TYPE_SOURCE ||
377 upstream_comp->class->type ==
378 BT_COMPONENT_CLASS_TYPE_FILTER);
379 iterator = bt_self_component_port_input_notification_iterator_create_initial(
380 upstream_comp, upstream_port);
381 if (!iterator) {
382 BT_LOGW_STR("Cannot create self component input port "
383 "notification iterator.");
384 goto end;
385 }
386
387 switch (upstream_comp_cls->type) {
388 case BT_COMPONENT_CLASS_TYPE_SOURCE:
389 {
390 struct bt_component_class_source *src_comp_cls =
391 (void *) upstream_comp_cls;
392
393 init_method =
394 (init_method_t) src_comp_cls->methods.notif_iter_init;
395 break;
396 }
397 case BT_COMPONENT_CLASS_TYPE_FILTER:
398 {
399 struct bt_component_class_filter *flt_comp_cls =
400 (void *) upstream_comp_cls;
401
402 init_method =
403 (init_method_t) flt_comp_cls->methods.notif_iter_init;
404 break;
405 }
406 default:
407 /* Unreachable */
408 abort();
409 }
410
411 if (init_method) {
412 int iter_status;
413
414 BT_LIB_LOGD("Calling user's initialization method: %!+i", iterator);
415 iter_status = init_method(iterator, upstream_comp,
416 upstream_port);
417 BT_LOGD("User method returned: status=%s",
418 bt_notification_iterator_status_string(iter_status));
419 if (iter_status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
420 BT_LOGW_STR("Initialization method failed.");
421 goto end;
422 }
423 }
424
425 iterator->state = BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_ACTIVE;
426 g_ptr_array_add(port->connection->iterators, iterator);
427 BT_LIB_LOGD("Created notification iterator on self component input port: "
428 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
429 upstream_port, upstream_comp, iterator);
430
431 end:
432 return iterator;
433 }
434
435 void *bt_self_notification_iterator_get_data(
436 struct bt_self_notification_iterator *self_iterator)
437 {
438 struct bt_self_component_port_input_notification_iterator *iterator =
439 (void *) self_iterator;
440
441 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
442 return iterator->user_data;
443 }
444
445 void bt_self_notification_iterator_set_data(
446 struct bt_self_notification_iterator *self_iterator, void *data)
447 {
448 struct bt_self_component_port_input_notification_iterator *iterator =
449 (void *) self_iterator;
450
451 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
452 iterator->user_data = data;
453 BT_LIB_LOGV("Set notification iterator's user data: "
454 "%!+i, user-data-addr=%p", iterator, data);
455 }
456
457 BT_ASSERT_PRE_FUNC
458 static inline
459 void bt_notification_borrow_packet_stream(struct bt_notification *notif,
460 struct bt_stream **stream, struct bt_packet **packet)
461 {
462 BT_ASSERT(notif);
463
464 switch (notif->type) {
465 case BT_NOTIFICATION_TYPE_EVENT:
466 *packet = bt_event_borrow_packet(
467 bt_notification_event_borrow_event(notif));
468 *stream = bt_packet_borrow_stream(*packet);
469 break;
470 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
471 *stream = bt_notification_stream_begin_borrow_stream(notif);
472 break;
473 case BT_NOTIFICATION_TYPE_STREAM_END:
474 *stream = bt_notification_stream_end_borrow_stream(notif);
475 break;
476 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
477 *packet = bt_notification_packet_begin_borrow_packet(notif);
478 *stream = bt_packet_borrow_stream(*packet);
479 break;
480 case BT_NOTIFICATION_TYPE_PACKET_END:
481 *packet = bt_notification_packet_end_borrow_packet(notif);
482 *stream = bt_packet_borrow_stream(*packet);
483 break;
484 default:
485 break;
486 }
487 }
488
489 BT_ASSERT_PRE_FUNC
490 static inline
491 bool validate_notification(
492 struct bt_self_component_port_input_notification_iterator *iterator,
493 struct bt_notification *notif)
494 {
495 bool is_valid = true;
496 struct stream_state *stream_state;
497 struct bt_stream *stream = NULL;
498 struct bt_packet *packet = NULL;
499
500 BT_ASSERT(notif);
501 bt_notification_borrow_packet_stream(notif, &stream, &packet);
502
503 if (!stream) {
504 /* we don't care about notifications not attached to streams */
505 goto end;
506 }
507
508 stream_state = g_hash_table_lookup(iterator->stream_states, stream);
509 if (!stream_state) {
510 /*
511 * No stream state for this stream: this notification
512 * MUST be a BT_NOTIFICATION_TYPE_STREAM_BEGIN notification
513 * and its sequence number must be 0.
514 */
515 if (notif->type != BT_NOTIFICATION_TYPE_STREAM_BEGIN) {
516 BT_ASSERT_PRE_MSG("Unexpected notification: missing a "
517 "BT_NOTIFICATION_TYPE_STREAM_BEGIN "
518 "notification prior to this notification: "
519 "%![stream-]+s", stream);
520 is_valid = false;
521 goto end;
522 }
523
524 if (notif->seq_num == -1ULL) {
525 notif->seq_num = 0;
526 }
527
528 if (notif->seq_num != 0) {
529 BT_ASSERT_PRE_MSG("Unexpected notification sequence "
530 "number for this notification iterator: "
531 "this is the first notification for this "
532 "stream, expecting sequence number 0: "
533 "seq-num=%" PRIu64 ", %![stream-]+s",
534 notif->seq_num, stream);
535 is_valid = false;
536 goto end;
537 }
538
539 stream_state = create_stream_state(stream);
540 if (!stream_state) {
541 abort();
542 }
543
544 g_hash_table_insert(iterator->stream_states, stream,
545 stream_state);
546 stream_state->expected_notif_seq_num++;
547 goto end;
548 }
549
550 if (stream_state->is_ended) {
551 /*
552 * There's a new notification which has a reference to a
553 * stream which, from this iterator's point of view, is
554 * ended ("end of stream" notification was returned).
555 * This is bad: the API guarantees that it can never
556 * happen.
557 */
558 BT_ASSERT_PRE_MSG("Stream is already ended: %![stream-]+s",
559 stream);
560 is_valid = false;
561 goto end;
562 }
563
564 if (notif->seq_num == -1ULL) {
565 notif->seq_num = stream_state->expected_notif_seq_num;
566 }
567
568 if (notif->seq_num != -1ULL &&
569 notif->seq_num != stream_state->expected_notif_seq_num) {
570 BT_ASSERT_PRE_MSG("Unexpected notification sequence number: "
571 "seq-num=%" PRIu64 ", "
572 "expected-seq-num=%" PRIu64 ", %![stream-]+s",
573 notif->seq_num, stream_state->expected_notif_seq_num,
574 stream);
575 is_valid = false;
576 goto end;
577 }
578
579 switch (notif->type) {
580 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
581 BT_ASSERT_PRE_MSG("Unexpected BT_NOTIFICATION_TYPE_STREAM_BEGIN "
582 "notification at this point: notif-seq-num=%" PRIu64 ", "
583 "%![stream-]+s", notif->seq_num, stream);
584 is_valid = false;
585 goto end;
586 case BT_NOTIFICATION_TYPE_STREAM_END:
587 if (stream_state->cur_packet) {
588 BT_ASSERT_PRE_MSG("Unexpected BT_NOTIFICATION_TYPE_STREAM_END "
589 "notification: missing a "
590 "BT_NOTIFICATION_TYPE_PACKET_END notification "
591 "prior to this notification: "
592 "notif-seq-num=%" PRIu64 ", "
593 "%![stream-]+s", notif->seq_num, stream);
594 is_valid = false;
595 goto end;
596 }
597 stream_state->expected_notif_seq_num++;
598 stream_state->is_ended = true;
599 goto end;
600 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
601 if (stream_state->cur_packet) {
602 BT_ASSERT_PRE_MSG("Unexpected BT_NOTIFICATION_TYPE_PACKET_BEGIN "
603 "notification at this point: missing a "
604 "BT_NOTIFICATION_TYPE_PACKET_END notification "
605 "prior to this notification: "
606 "notif-seq-num=%" PRIu64 ", %![stream-]+s, "
607 "%![packet-]+a", notif->seq_num, stream,
608 packet);
609 is_valid = false;
610 goto end;
611 }
612 stream_state->expected_notif_seq_num++;
613 stream_state->cur_packet = packet;
614 bt_object_get_no_null_check(stream_state->cur_packet);
615 goto end;
616 case BT_NOTIFICATION_TYPE_PACKET_END:
617 if (!stream_state->cur_packet) {
618 BT_ASSERT_PRE_MSG("Unexpected BT_NOTIFICATION_TYPE_PACKET_END "
619 "notification at this point: missing a "
620 "BT_NOTIFICATION_TYPE_PACKET_BEGIN notification "
621 "prior to this notification: "
622 "notif-seq-num=%" PRIu64 ", %![stream-]+s, "
623 "%![packet-]+a", notif->seq_num, stream,
624 packet);
625 is_valid = false;
626 goto end;
627 }
628 stream_state->expected_notif_seq_num++;
629 BT_OBJECT_PUT_REF_AND_RESET(stream_state->cur_packet);
630 goto end;
631 case BT_NOTIFICATION_TYPE_EVENT:
632 if (packet != stream_state->cur_packet) {
633 BT_ASSERT_PRE_MSG("Unexpected packet for "
634 "BT_NOTIFICATION_TYPE_EVENT notification: "
635 "notif-seq-num=%" PRIu64 ", %![stream-]+s, "
636 "%![notif-packet-]+a, %![expected-packet-]+a",
637 notif->seq_num, stream,
638 stream_state->cur_packet, packet);
639 is_valid = false;
640 goto end;
641 }
642 stream_state->expected_notif_seq_num++;
643 goto end;
644 default:
645 break;
646 }
647
648 end:
649 return is_valid;
650 }
651
652 BT_ASSERT_PRE_FUNC
653 static inline
654 bool validate_notifications(
655 struct bt_self_component_port_input_notification_iterator *iterator,
656 uint64_t count)
657 {
658 bool ret = true;
659 bt_notification_array notifs = (void *) iterator->base.notifs->pdata;
660 uint64_t i;
661
662 for (i = 0; i < count; i++) {
663 ret = validate_notification(iterator, notifs[i]);
664 if (!ret) {
665 break;
666 }
667 }
668
669 return ret;
670 }
671
672 BT_ASSERT_PRE_FUNC
673 static inline bool priv_conn_notif_iter_can_end(
674 struct bt_self_component_port_input_notification_iterator *iterator)
675 {
676 GHashTableIter iter;
677 gpointer stream_key, state_value;
678 bool ret = true;
679
680 /*
681 * Verify that this iterator received a
682 * BT_NOTIFICATION_TYPE_STREAM_END notification for each stream
683 * which has a state.
684 */
685
686 g_hash_table_iter_init(&iter, iterator->stream_states);
687
688 while (g_hash_table_iter_next(&iter, &stream_key, &state_value)) {
689 struct stream_state *stream_state = (void *) state_value;
690
691 BT_ASSERT(stream_state);
692 BT_ASSERT(stream_key);
693
694 if (!stream_state->is_ended) {
695 BT_ASSERT_PRE_MSG("Ending notification iterator, "
696 "but stream is not ended: "
697 "%![stream-]s", stream_key);
698 ret = false;
699 goto end;
700 }
701 }
702
703 end:
704 return ret;
705 }
706
707 enum bt_notification_iterator_status
708 bt_self_component_port_input_notification_iterator_next(
709 struct bt_self_component_port_input_notification_iterator *iterator,
710 bt_notification_array *notifs, uint64_t *user_count)
711 {
712 typedef enum bt_self_notification_iterator_status (*method_t)(
713 void *, bt_notification_array, uint64_t, uint64_t *);
714
715 method_t method = NULL;
716 struct bt_component_class *comp_cls;
717 int status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
718
719 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
720 BT_ASSERT_PRE_NON_NULL(notifs, "Notification array (output)");
721 BT_ASSERT_PRE_NON_NULL(user_count, "Notification count (output)");
722 BT_ASSERT_PRE(iterator->state ==
723 BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_ACTIVE,
724 "Notification iterator's \"next\" called, but "
725 "iterator is in the wrong state: %!+i", iterator);
726 BT_ASSERT(iterator->upstream_component);
727 BT_ASSERT(iterator->upstream_component->class);
728 BT_LIB_LOGD("Getting next self component input port "
729 "notification iterator's notifications: %!+i", iterator);
730 comp_cls = iterator->upstream_component->class;
731
732 /* Pick the appropriate "next" method */
733 switch (comp_cls->type) {
734 case BT_COMPONENT_CLASS_TYPE_SOURCE:
735 {
736 struct bt_component_class_source *src_comp_cls =
737 (void *) comp_cls;
738
739 method = (method_t) src_comp_cls->methods.notif_iter_next;
740 break;
741 }
742 case BT_COMPONENT_CLASS_TYPE_FILTER:
743 {
744 struct bt_component_class_filter *flt_comp_cls =
745 (void *) comp_cls;
746
747 method = (method_t) flt_comp_cls->methods.notif_iter_next;
748 break;
749 }
750 default:
751 abort();
752 }
753
754 /*
755 * Call the user's "next" method to get the next notifications
756 * and status.
757 */
758 BT_ASSERT(method);
759 BT_LOGD_STR("Calling user's \"next\" method.");
760 status = method(iterator,
761 (void *) iterator->base.notifs->pdata,
762 NOTIF_BATCH_SIZE, user_count);
763 BT_LOGD("User method returned: status=%s",
764 bt_notification_iterator_status_string(status));
765 if (status < 0) {
766 BT_LOGW_STR("User method failed.");
767 goto end;
768 }
769
770 if (iterator->state == BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_FINALIZED ||
771 iterator->state == BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED) {
772 /*
773 * The user's "next" method, somehow, cancelled its own
774 * notification iterator. This can happen, for example,
775 * when the user's method removes the port on which
776 * there's the connection from which the iterator was
777 * created. In this case, said connection is ended, and
778 * all its notification iterators are finalized.
779 *
780 * Only bt_object_put_ref() the returned notification if
781 * the status is BT_NOTIFICATION_ITERATOR_STATUS_OK
782 * because otherwise this field could be garbage.
783 */
784 if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
785 uint64_t i;
786 bt_notification_array notifs =
787 (void *) iterator->base.notifs->pdata;
788
789 for (i = 0; i < *user_count; i++) {
790 bt_object_put_ref(notifs[i]);
791 }
792 }
793
794 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
795 goto end;
796 }
797
798 switch (status) {
799 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
800 BT_ASSERT_PRE(validate_notifications(iterator, *user_count),
801 "Notifications are invalid at this point: "
802 "%![notif-iter-]+i, count=%" PRIu64,
803 iterator, *user_count);
804 *notifs = (void *) iterator->base.notifs->pdata;
805 break;
806 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
807 goto end;
808 case BT_NOTIFICATION_ITERATOR_STATUS_END:
809 BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator),
810 "Notification iterator cannot end at this point: "
811 "%!+i", iterator);
812 BT_ASSERT(iterator->state ==
813 BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_ACTIVE);
814 iterator->state = BT_SELF_COMPONENT_PORT_INPUT_NOTIFICATION_ITERATOR_STATE_ENDED;
815 BT_LOGD("Set new status: status=%s",
816 bt_notification_iterator_status_string(status));
817 goto end;
818 default:
819 /* Unknown non-error status */
820 abort();
821 }
822
823 end:
824 return status;
825 }
826
827 enum bt_notification_iterator_status
828 bt_port_output_notification_iterator_next(
829 struct bt_port_output_notification_iterator *iterator,
830 bt_notification_array *notifs_to_user,
831 uint64_t *count_to_user)
832 {
833 enum bt_notification_iterator_status status;
834 enum bt_graph_status graph_status;
835
836 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
837 BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array (output)");
838 BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count (output)");
839 BT_LIB_LOGD("Getting next output port notification iterator's notifications: "
840 "%!+i", iterator);
841
842 graph_status = bt_graph_consume_sink_no_check(iterator->graph,
843 iterator->colander);
844 switch (graph_status) {
845 case BT_GRAPH_STATUS_CANCELED:
846 case BT_GRAPH_STATUS_AGAIN:
847 case BT_GRAPH_STATUS_END:
848 case BT_GRAPH_STATUS_NOMEM:
849 status = (int) graph_status;
850 break;
851 case BT_GRAPH_STATUS_OK:
852 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
853
854 /*
855 * On success, the colander sink moves the notifications
856 * to this iterator's array and sets this iterator's
857 * notification count: move them to the user.
858 */
859 *notifs_to_user = (void *) iterator->base.notifs->pdata;
860 *count_to_user = iterator->count;
861 break;
862 default:
863 /* Other errors */
864 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
865 }
866
867 return status;
868 }
869
870 struct bt_component *bt_self_component_port_input_notification_iterator_borrow_component(
871 struct bt_self_component_port_input_notification_iterator *iterator)
872 {
873 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
874 return iterator->upstream_component;
875 }
876
877 struct bt_self_component *bt_self_notification_iterator_borrow_component(
878 struct bt_self_notification_iterator *self_iterator)
879 {
880 struct bt_self_component_port_input_notification_iterator *iterator =
881 (void *) self_iterator;
882
883 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
884 return (void *) iterator->upstream_component;
885 }
886
887 struct bt_self_port_output *bt_self_notification_iterator_borrow_port(
888 struct bt_self_notification_iterator *self_iterator)
889 {
890 struct bt_self_component_port_input_notification_iterator *iterator =
891 (void *) self_iterator;
892
893 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
894 return (void *) iterator->upstream_port;
895 }
896
897 static
898 void bt_port_output_notification_iterator_destroy(struct bt_object *obj)
899 {
900 struct bt_port_output_notification_iterator *iterator = (void *) obj;
901
902 BT_LIB_LOGD("Destroying output port notification iterator object: %!+i",
903 iterator);
904 BT_LOGD_STR("Putting graph.");
905 BT_OBJECT_PUT_REF_AND_RESET(iterator->graph);
906 BT_LOGD_STR("Putting colander sink component.");
907 BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
908 destroy_base_notification_iterator(obj);
909 }
910
911 struct bt_port_output_notification_iterator *
912 bt_port_output_notification_iterator_create(
913 struct bt_private_graph *priv_graph,
914 struct bt_port_output *output_port)
915 {
916 struct bt_port_output_notification_iterator *iterator = NULL;
917 struct bt_component_class_sink *colander_comp_cls = NULL;
918 struct bt_component *output_port_comp = NULL;
919 struct bt_component_sink *colander_comp;
920 struct bt_graph *graph = (void *) priv_graph;
921 enum bt_graph_status graph_status;
922 struct bt_port_input *colander_in_port = NULL;
923 struct bt_component_class_sink_colander_data colander_data;
924 int ret;
925
926 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
927 BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
928 output_port_comp = bt_port_borrow_component((void *) output_port);
929 BT_ASSERT_PRE(output_port_comp,
930 "Output port has no component: %!+p", output_port);
931 BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp) ==
932 (void *) graph,
933 "Output port is not part of graph: %![graph-]+g, %![port-]+p",
934 graph, output_port);
935
936 /* Create notification iterator */
937 BT_LIB_LOGD("Creating notification iterator on output port: "
938 "%![port-]+p, %![comp-]+c", output_port, output_port_comp);
939 iterator = g_new0(struct bt_port_output_notification_iterator, 1);
940 if (!iterator) {
941 BT_LOGE_STR("Failed to allocate one output port notification iterator.");
942 goto error;
943 }
944
945 ret = init_notification_iterator((void *) iterator,
946 BT_NOTIFICATION_ITERATOR_TYPE_PORT_OUTPUT,
947 bt_port_output_notification_iterator_destroy);
948 if (ret) {
949 /* init_notification_iterator() logs errors */
950 BT_OBJECT_PUT_REF_AND_RESET(iterator);
951 goto end;
952 }
953
954 /* Create colander component */
955 colander_comp_cls = bt_component_class_sink_colander_get();
956 if (!colander_comp_cls) {
957 BT_LOGW("Cannot get colander sink component class.");
958 goto error;
959 }
960
961 iterator->graph = graph;
962 bt_object_get_no_null_check(iterator->graph);
963 colander_data.notifs = (void *) iterator->base.notifs->pdata;
964 colander_data.count_addr = &iterator->count;
965
966 /* Hope that nobody uses this very unique name */
967 graph_status =
968 bt_private_graph_add_sink_component_with_init_method_data(
969 (void *) graph, colander_comp_cls,
970 "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1",
971 NULL, &colander_data, &iterator->colander);
972 if (graph_status != BT_GRAPH_STATUS_OK) {
973 BT_LIB_LOGW("Cannot add colander sink component to graph: "
974 "%1[graph-]+g, status=%s", graph,
975 bt_graph_status_string(graph_status));
976 goto error;
977 }
978
979 /*
980 * Connect provided output port to the colander component's
981 * input port.
982 */
983 colander_in_port = bt_component_sink_borrow_input_port_by_index(
984 iterator->colander, 0);
985 BT_ASSERT(colander_in_port);
986 graph_status = bt_private_graph_connect_ports(priv_graph,
987 output_port, colander_in_port, NULL);
988 if (graph_status != BT_GRAPH_STATUS_OK) {
989 BT_LIB_LOGW("Cannot add colander sink component to graph: "
990 "%![graph-]+g, %![comp-]+c, status=%s", graph,
991 iterator->colander,
992 bt_graph_status_string(graph_status));
993 goto error;
994 }
995
996 /*
997 * At this point everything went fine. Make the graph
998 * nonconsumable forever so that only this notification iterator
999 * can consume (thanks to bt_graph_consume_sink_no_check()).
1000 * This avoids leaking the notification created by the colander
1001 * sink and moved to the notification iterator's notification
1002 * member.
1003 */
1004 bt_graph_set_can_consume(iterator->graph, false);
1005 goto end;
1006
1007 error:
1008 if (iterator && iterator->graph && iterator->colander) {
1009 int ret;
1010
1011 /* Remove created colander component from graph if any */
1012 colander_comp = iterator->colander;
1013 BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
1014
1015 /*
1016 * At this point the colander component's reference
1017 * count is 0 because iterator->colander was the only
1018 * owner. We also know that it is not connected because
1019 * this is the last operation before this function
1020 * succeeds.
1021 *
1022 * Since we honor the preconditions here,
1023 * bt_graph_remove_unconnected_component() always
1024 * succeeds.
1025 */
1026 ret = bt_graph_remove_unconnected_component(iterator->graph,
1027 (void *) colander_comp);
1028 BT_ASSERT(ret == 0);
1029 }
1030
1031 BT_OBJECT_PUT_REF_AND_RESET(iterator);
1032
1033 end:
1034 bt_object_put_ref(colander_comp_cls);
1035 return (void *) iterator;
1036 }
This page took 0.072475 seconds and 3 git commands to generate.