Rename: field type -> field class
[babeltrace.git] / lib / graph / iterator.c
1 /*
2 * iterator.c
3 *
4 * Babeltrace Notification Iterator
5 *
6 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining a copy
10 * of this software and associated documentation files (the "Software"), to deal
11 * in the Software without restriction, including without limitation the rights
12 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 * copies of the Software, and to permit persons to whom the Software is
14 * furnished to do so, subject to the following conditions:
15 *
16 * The above copyright notice and this permission notice shall be included in
17 * all copies or substantial portions of the Software.
18 *
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 * SOFTWARE.
26 */
27
28 #define BT_LOG_TAG "NOTIF-ITER"
29 #include <babeltrace/lib-logging-internal.h>
30
31 #include <babeltrace/compiler-internal.h>
32 #include <babeltrace/ref.h>
33 #include <babeltrace/trace-ir/fields.h>
34 #include <babeltrace/trace-ir/event-internal.h>
35 #include <babeltrace/trace-ir/packet-internal.h>
36 #include <babeltrace/trace-ir/stream-internal.h>
37 #include <babeltrace/graph/connection.h>
38 #include <babeltrace/graph/connection-internal.h>
39 #include <babeltrace/graph/component.h>
40 #include <babeltrace/graph/component-source-internal.h>
41 #include <babeltrace/graph/component-class-internal.h>
42 #include <babeltrace/graph/component-class-sink-colander-internal.h>
43 #include <babeltrace/graph/component-sink.h>
44 #include <babeltrace/graph/notification.h>
45 #include <babeltrace/graph/notification-iterator.h>
46 #include <babeltrace/graph/notification-iterator-internal.h>
47 #include <babeltrace/graph/notification-internal.h>
48 #include <babeltrace/graph/notification-event.h>
49 #include <babeltrace/graph/notification-event-internal.h>
50 #include <babeltrace/graph/notification-packet.h>
51 #include <babeltrace/graph/notification-packet-internal.h>
52 #include <babeltrace/graph/notification-stream.h>
53 #include <babeltrace/graph/notification-stream-internal.h>
54 #include <babeltrace/graph/port.h>
55 #include <babeltrace/graph/graph-internal.h>
56 #include <babeltrace/types.h>
57 #include <babeltrace/assert-internal.h>
58 #include <babeltrace/assert-pre-internal.h>
59 #include <stdint.h>
60 #include <inttypes.h>
61 #include <stdlib.h>
62
63 /*
64 * TODO: Use graph's state (number of active iterators, etc.) and
65 * possibly system specifications to make a better guess than this.
66 */
67 #define NOTIF_BATCH_SIZE 15
68
69 struct stream_state {
70 struct bt_stream *stream; /* owned by this */
71 struct bt_packet *cur_packet; /* owned by this */
72 uint64_t expected_notif_seq_num;
73 bt_bool is_ended;
74 };
75
76 BT_ASSERT_PRE_FUNC
77 static
78 void destroy_stream_state(struct stream_state *stream_state)
79 {
80 if (!stream_state) {
81 return;
82 }
83
84 BT_LOGV("Destroying stream state: stream-state-addr=%p", stream_state);
85 BT_LOGV_STR("Putting stream state's current packet.");
86 bt_put(stream_state->cur_packet);
87 BT_LOGV_STR("Putting stream state's stream.");
88 bt_put(stream_state->stream);
89 g_free(stream_state);
90 }
91
92 BT_ASSERT_PRE_FUNC
93 static
94 struct stream_state *create_stream_state(struct bt_stream *stream)
95 {
96 struct stream_state *stream_state = g_new0(struct stream_state, 1);
97
98 if (!stream_state) {
99 BT_LOGE_STR("Failed to allocate one stream state.");
100 goto end;
101 }
102
103 /*
104 * We keep a reference to the stream until we know it's ended.
105 */
106 stream_state->stream = bt_get(stream);
107 BT_LOGV("Created stream state: stream-addr=%p, stream-name=\"%s\", "
108 "stream-state-addr=%p",
109 stream, bt_stream_get_name(stream), stream_state);
110
111 end:
112 return stream_state;
113 }
114
115 static
116 void destroy_base_notification_iterator(struct bt_object *obj)
117 {
118 struct bt_notification_iterator *iterator = (void *) obj;
119
120 BT_ASSERT(iterator);
121
122 if (iterator->notifs) {
123 g_ptr_array_free(iterator->notifs, TRUE);
124 }
125
126 g_free(iterator);
127 }
128
129 static
130 void bt_private_connection_notification_iterator_destroy(struct bt_object *obj)
131 {
132 struct bt_notification_iterator_private_connection *iterator;
133
134 BT_ASSERT(obj);
135
136 /*
137 * The notification iterator's reference count is 0 if we're
138 * here. Increment it to avoid a double-destroy (possibly
139 * infinitely recursive). This could happen for example if the
140 * notification iterator's finalization function does bt_get()
141 * (or anything that causes bt_get() to be called) on itself
142 * (ref. count goes from 0 to 1), and then bt_put(): the
143 * reference count would go from 1 to 0 again and this function
144 * would be called again.
145 */
146 obj->ref_count++;
147 iterator = (void *) obj;
148 BT_LOGD("Destroying private connection notification iterator object: addr=%p",
149 iterator);
150 bt_private_connection_notification_iterator_finalize(iterator);
151
152 if (iterator->stream_states) {
153 /*
154 * Remove our destroy listener from each stream which
155 * has a state in this iterator. Otherwise the destroy
156 * listener would be called with an invalid/other
157 * notification iterator object.
158 */
159 g_hash_table_destroy(iterator->stream_states);
160 }
161
162 if (iterator->connection) {
163 /*
164 * Remove ourself from the originating connection so
165 * that it does not try to finalize a dangling pointer
166 * later.
167 */
168 bt_connection_remove_iterator(iterator->connection, iterator);
169 }
170
171 destroy_base_notification_iterator(obj);
172 }
173
174 BT_HIDDEN
175 void bt_private_connection_notification_iterator_finalize(
176 struct bt_notification_iterator_private_connection *iterator)
177 {
178 struct bt_component_class *comp_class = NULL;
179 bt_component_class_notification_iterator_finalize_method
180 finalize_method = NULL;
181
182 BT_ASSERT(iterator);
183
184 switch (iterator->state) {
185 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_NON_INITIALIZED:
186 /* Skip user finalization if user initialization failed */
187 BT_LOGD("Not finalizing non-initialized notification iterator: "
188 "addr=%p", iterator);
189 return;
190 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED:
191 case BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED:
192 /* Already finalized */
193 BT_LOGD("Not finalizing notification iterator: already finalized: "
194 "addr=%p", iterator);
195 return;
196 default:
197 break;
198 }
199
200 BT_LOGD("Finalizing notification iterator: addr=%p", iterator);
201
202 if (iterator->state == BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ENDED) {
203 BT_LOGD("Updating notification iterator's state: "
204 "new-state=BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED");
205 iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED;
206 } else {
207 BT_LOGD("Updating notification iterator's state: "
208 "new-state=BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED");
209 iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED;
210 }
211
212 BT_ASSERT(iterator->upstream_component);
213 comp_class = iterator->upstream_component->class;
214
215 /* Call user-defined destroy method */
216 switch (comp_class->type) {
217 case BT_COMPONENT_CLASS_TYPE_SOURCE:
218 {
219 struct bt_component_class_source *source_class;
220
221 source_class = container_of(comp_class, struct bt_component_class_source, parent);
222 finalize_method = source_class->methods.iterator.finalize;
223 break;
224 }
225 case BT_COMPONENT_CLASS_TYPE_FILTER:
226 {
227 struct bt_component_class_filter *filter_class;
228
229 filter_class = container_of(comp_class, struct bt_component_class_filter, parent);
230 finalize_method = filter_class->methods.iterator.finalize;
231 break;
232 }
233 default:
234 /* Unreachable */
235 abort();
236 }
237
238 if (finalize_method) {
239 BT_LOGD("Calling user's finalization method: addr=%p",
240 iterator);
241 finalize_method(
242 bt_private_connection_private_notification_iterator_from_notification_iterator(iterator));
243 }
244
245 iterator->upstream_component = NULL;
246 iterator->upstream_port = NULL;
247 BT_LOGD("Finalized notification iterator: addr=%p", iterator);
248 }
249
250 BT_HIDDEN
251 void bt_private_connection_notification_iterator_set_connection(
252 struct bt_notification_iterator_private_connection *iterator,
253 struct bt_connection *connection)
254 {
255 BT_ASSERT(iterator);
256 iterator->connection = connection;
257 BT_LOGV("Set notification iterator's connection: "
258 "iter-addr=%p, conn-addr=%p", iterator, connection);
259 }
260
261 static
262 int init_notification_iterator(struct bt_notification_iterator *iterator,
263 enum bt_notification_iterator_type type,
264 bt_object_release_func destroy)
265 {
266 int ret = 0;
267
268 bt_object_init_shared(&iterator->base, destroy);
269 iterator->type = type;
270 iterator->notifs = g_ptr_array_new();
271 if (!iterator->notifs) {
272 BT_LOGE_STR("Failed to allocate a GPtrArray.");
273 ret = -1;
274 goto end;
275 }
276
277 g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE);
278
279 end:
280 return ret;
281 }
282
283 BT_HIDDEN
284 enum bt_connection_status bt_private_connection_notification_iterator_create(
285 struct bt_component *upstream_comp,
286 struct bt_port *upstream_port,
287 struct bt_connection *connection,
288 struct bt_notification_iterator_private_connection **user_iterator)
289 {
290 enum bt_connection_status status = BT_CONNECTION_STATUS_OK;
291 enum bt_component_class_type type;
292 struct bt_notification_iterator_private_connection *iterator = NULL;
293 int ret;
294
295 BT_ASSERT(upstream_comp);
296 BT_ASSERT(upstream_port);
297 BT_ASSERT(bt_port_is_connected(upstream_port));
298 BT_ASSERT(user_iterator);
299 BT_LOGD("Creating notification iterator on private connection: "
300 "upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
301 "upstream-port-addr=%p, upstream-port-name=\"%s\", "
302 "conn-addr=%p",
303 upstream_comp, bt_component_get_name(upstream_comp),
304 upstream_port, bt_port_get_name(upstream_port),
305 connection);
306 type = bt_component_get_class_type(upstream_comp);
307 BT_ASSERT(type == BT_COMPONENT_CLASS_TYPE_SOURCE ||
308 type == BT_COMPONENT_CLASS_TYPE_FILTER);
309 iterator = g_new0(struct bt_notification_iterator_private_connection, 1);
310 if (!iterator) {
311 BT_LOGE_STR("Failed to allocate one private connection notification iterator.");
312 status = BT_CONNECTION_STATUS_NOMEM;
313 goto end;
314 }
315
316 ret = init_notification_iterator((void *) iterator,
317 BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
318 bt_private_connection_notification_iterator_destroy);
319 if (ret) {
320 /* init_notification_iterator() logs errors */
321 status = BT_CONNECTION_STATUS_NOMEM;
322 goto end;
323 }
324
325 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
326 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
327 if (!iterator->stream_states) {
328 BT_LOGE_STR("Failed to allocate a GHashTable.");
329 status = BT_CONNECTION_STATUS_NOMEM;
330 goto end;
331 }
332
333 iterator->upstream_component = upstream_comp;
334 iterator->upstream_port = upstream_port;
335 iterator->connection = connection;
336 iterator->graph = bt_component_borrow_graph(upstream_comp);
337 iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_NON_INITIALIZED;
338 BT_LOGD("Created notification iterator: "
339 "upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
340 "upstream-port-addr=%p, upstream-port-name=\"%s\", "
341 "conn-addr=%p, iter-addr=%p",
342 upstream_comp, bt_component_get_name(upstream_comp),
343 upstream_port, bt_port_get_name(upstream_port),
344 connection, iterator);
345
346 /* Move reference to user */
347 *user_iterator = iterator;
348 iterator = NULL;
349
350 end:
351 bt_put(iterator);
352 return status;
353 }
354
355 void *bt_private_connection_private_notification_iterator_get_user_data(
356 struct bt_private_connection_private_notification_iterator *private_iterator)
357 {
358 struct bt_notification_iterator_private_connection *iterator = (void *)
359 bt_private_connection_notification_iterator_borrow_from_private(private_iterator);
360
361 BT_ASSERT_PRE_NON_NULL(private_iterator, "Notification iterator");
362 return iterator->user_data;
363 }
364
365 enum bt_notification_iterator_status
366 bt_private_connection_private_notification_iterator_set_user_data(
367 struct bt_private_connection_private_notification_iterator *private_iterator,
368 void *data)
369 {
370 struct bt_notification_iterator_private_connection *iterator = (void *)
371 bt_private_connection_notification_iterator_borrow_from_private(private_iterator);
372
373 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
374 iterator->user_data = data;
375 BT_LOGV("Set notification iterator's user data: "
376 "iter-addr=%p, user-data-addr=%p", iterator, data);
377 return BT_NOTIFICATION_ITERATOR_STATUS_OK;
378 }
379
380 struct bt_graph *bt_private_connection_private_notification_iterator_borrow_graph(
381 struct bt_private_connection_private_notification_iterator *private_iterator)
382 {
383 struct bt_notification_iterator_private_connection *iterator = (void *)
384 bt_private_connection_notification_iterator_borrow_from_private(
385 private_iterator);
386
387 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
388 return iterator->graph;
389 }
390
391 BT_ASSERT_PRE_FUNC
392 static inline
393 void bt_notification_borrow_packet_stream(struct bt_notification *notif,
394 struct bt_stream **stream, struct bt_packet **packet)
395 {
396 BT_ASSERT(notif);
397
398 switch (notif->type) {
399 case BT_NOTIFICATION_TYPE_EVENT:
400 *packet = bt_event_borrow_packet(
401 bt_notification_event_borrow_event(notif));
402 *stream = bt_packet_borrow_stream(*packet);
403 break;
404 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
405 *stream = bt_notification_stream_begin_borrow_stream(notif);
406 break;
407 case BT_NOTIFICATION_TYPE_STREAM_END:
408 *stream = bt_notification_stream_end_borrow_stream(notif);
409 break;
410 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
411 *packet = bt_notification_packet_begin_borrow_packet(notif);
412 *stream = bt_packet_borrow_stream(*packet);
413 break;
414 case BT_NOTIFICATION_TYPE_PACKET_END:
415 *packet = bt_notification_packet_end_borrow_packet(notif);
416 *stream = bt_packet_borrow_stream(*packet);
417 break;
418 default:
419 break;
420 }
421 }
422
423 BT_ASSERT_PRE_FUNC
424 static inline
425 bool validate_notification(
426 struct bt_notification_iterator_private_connection *iterator,
427 struct bt_notification *notif)
428 {
429 bool is_valid = true;
430 struct stream_state *stream_state;
431 struct bt_stream *stream = NULL;
432 struct bt_packet *packet = NULL;
433
434 BT_ASSERT(notif);
435 bt_notification_borrow_packet_stream(notif, &stream, &packet);
436
437 if (!stream) {
438 /* we don't care about notifications not attached to streams */
439 goto end;
440 }
441
442 stream_state = g_hash_table_lookup(iterator->stream_states, stream);
443 if (!stream_state) {
444 /*
445 * No stream state for this stream: this notification
446 * MUST be a BT_NOTIFICATION_TYPE_STREAM_BEGIN notification
447 * and its sequence number must be 0.
448 */
449 if (notif->type != BT_NOTIFICATION_TYPE_STREAM_BEGIN) {
450 BT_ASSERT_PRE_MSG("Unexpected notification: missing a "
451 "BT_NOTIFICATION_TYPE_STREAM_BEGIN "
452 "notification prior to this notification: "
453 "%![stream-]+s", stream);
454 is_valid = false;
455 goto end;
456 }
457
458 if (notif->seq_num == -1ULL) {
459 notif->seq_num = 0;
460 }
461
462 if (notif->seq_num != 0) {
463 BT_ASSERT_PRE_MSG("Unexpected notification sequence "
464 "number for this notification iterator: "
465 "this is the first notification for this "
466 "stream, expecting sequence number 0: "
467 "seq-num=%" PRIu64 ", %![stream-]+s",
468 notif->seq_num, stream);
469 is_valid = false;
470 goto end;
471 }
472
473 stream_state = create_stream_state(stream);
474 if (!stream_state) {
475 abort();
476 }
477
478 g_hash_table_insert(iterator->stream_states, stream,
479 stream_state);
480 stream_state->expected_notif_seq_num++;
481 goto end;
482 }
483
484 if (stream_state->is_ended) {
485 /*
486 * There's a new notification which has a reference to a
487 * stream which, from this iterator's point of view, is
488 * ended ("end of stream" notification was returned).
489 * This is bad: the API guarantees that it can never
490 * happen.
491 */
492 BT_ASSERT_PRE_MSG("Stream is already ended: %![stream-]+s",
493 stream);
494 is_valid = false;
495 goto end;
496 }
497
498 if (notif->seq_num == -1ULL) {
499 notif->seq_num = stream_state->expected_notif_seq_num;
500 }
501
502 if (notif->seq_num != -1ULL &&
503 notif->seq_num != stream_state->expected_notif_seq_num) {
504 BT_ASSERT_PRE_MSG("Unexpected notification sequence number: "
505 "seq-num=%" PRIu64 ", "
506 "expected-seq-num=%" PRIu64 ", %![stream-]+s",
507 notif->seq_num, stream_state->expected_notif_seq_num,
508 stream);
509 is_valid = false;
510 goto end;
511 }
512
513 switch (notif->type) {
514 case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
515 BT_ASSERT_PRE_MSG("Unexpected BT_NOTIFICATION_TYPE_STREAM_BEGIN "
516 "notification at this point: notif-seq-num=%" PRIu64 ", "
517 "%![stream-]+s", notif->seq_num, stream);
518 is_valid = false;
519 goto end;
520 case BT_NOTIFICATION_TYPE_STREAM_END:
521 if (stream_state->cur_packet) {
522 BT_ASSERT_PRE_MSG("Unexpected BT_NOTIFICATION_TYPE_STREAM_END "
523 "notification: missing a "
524 "BT_NOTIFICATION_TYPE_PACKET_END notification "
525 "prior to this notification: "
526 "notif-seq-num=%" PRIu64 ", "
527 "%![stream-]+s", notif->seq_num, stream);
528 is_valid = false;
529 goto end;
530 }
531 stream_state->expected_notif_seq_num++;
532 stream_state->is_ended = true;
533 goto end;
534 case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
535 if (stream_state->cur_packet) {
536 BT_ASSERT_PRE_MSG("Unexpected BT_NOTIFICATION_TYPE_PACKET_BEGIN "
537 "notification at this point: missing a "
538 "BT_NOTIFICATION_TYPE_PACKET_END notification "
539 "prior to this notification: "
540 "notif-seq-num=%" PRIu64 ", %![stream-]+s, "
541 "%![packet-]+a", notif->seq_num, stream,
542 packet);
543 is_valid = false;
544 goto end;
545 }
546 stream_state->expected_notif_seq_num++;
547 stream_state->cur_packet = bt_get(packet);
548 goto end;
549 case BT_NOTIFICATION_TYPE_PACKET_END:
550 if (!stream_state->cur_packet) {
551 BT_ASSERT_PRE_MSG("Unexpected BT_NOTIFICATION_TYPE_PACKET_END "
552 "notification at this point: missing a "
553 "BT_NOTIFICATION_TYPE_PACKET_BEGIN notification "
554 "prior to this notification: "
555 "notif-seq-num=%" PRIu64 ", %![stream-]+s, "
556 "%![packet-]+a", notif->seq_num, stream,
557 packet);
558 is_valid = false;
559 goto end;
560 }
561 stream_state->expected_notif_seq_num++;
562 BT_PUT(stream_state->cur_packet);
563 goto end;
564 case BT_NOTIFICATION_TYPE_EVENT:
565 if (packet != stream_state->cur_packet) {
566 BT_ASSERT_PRE_MSG("Unexpected packet for "
567 "BT_NOTIFICATION_TYPE_EVENT notification: "
568 "notif-seq-num=%" PRIu64 ", %![stream-]+s, "
569 "%![notif-packet-]+a, %![expected-packet-]+a",
570 notif->seq_num, stream,
571 stream_state->cur_packet, packet);
572 is_valid = false;
573 goto end;
574 }
575 stream_state->expected_notif_seq_num++;
576 goto end;
577 default:
578 break;
579 }
580
581 end:
582 return is_valid;
583 }
584
585 BT_ASSERT_PRE_FUNC
586 static inline
587 bool validate_notifications(
588 struct bt_notification_iterator_private_connection *iterator,
589 uint64_t count)
590 {
591 bool ret = true;
592 bt_notification_array notifs = (void *) iterator->base.notifs->pdata;
593 uint64_t i;
594
595 for (i = 0; i < count; i++) {
596 ret = validate_notification(iterator, notifs[i]);
597 if (!ret) {
598 break;
599 }
600 }
601
602 return ret;
603 }
604
605 BT_ASSERT_PRE_FUNC
606 static inline bool priv_conn_notif_iter_can_end(
607 struct bt_notification_iterator_private_connection *iterator)
608 {
609 GHashTableIter iter;
610 gpointer stream_key, state_value;
611 bool ret = true;
612
613 /*
614 * Verify that this iterator received a
615 * BT_NOTIFICATION_TYPE_STREAM_END notification for each stream
616 * which has a state.
617 */
618
619 g_hash_table_iter_init(&iter, iterator->stream_states);
620
621 while (g_hash_table_iter_next(&iter, &stream_key, &state_value)) {
622 struct stream_state *stream_state = (void *) state_value;
623
624 BT_ASSERT(stream_state);
625 BT_ASSERT(stream_key);
626
627 if (!stream_state->is_ended) {
628 BT_ASSERT_PRE_MSG("Ending notification iterator, "
629 "but stream is not ended: "
630 "%![stream-]s", stream_key);
631 ret = false;
632 goto end;
633 }
634 }
635
636 end:
637 return ret;
638 }
639
640 enum bt_notification_iterator_status
641 bt_private_connection_notification_iterator_next(
642 struct bt_notification_iterator *user_iterator,
643 struct bt_notification ***user_notifs, uint64_t *user_count)
644 {
645 struct bt_notification_iterator_private_connection *iterator =
646 (void *) user_iterator;
647 struct bt_private_connection_private_notification_iterator *priv_iterator =
648 bt_private_connection_private_notification_iterator_from_notification_iterator(iterator);
649 bt_component_class_notification_iterator_next_method next_method = NULL;
650 enum bt_notification_iterator_status status =
651 BT_NOTIFICATION_ITERATOR_STATUS_OK;
652
653 BT_ASSERT_PRE_NON_NULL(user_iterator, "Notification iterator");
654 BT_ASSERT_PRE_NON_NULL(user_notifs, "Notification array");
655 BT_ASSERT_PRE_NON_NULL(user_count, "Notification count");
656 BT_ASSERT_PRE(user_iterator->type ==
657 BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
658 "Notification iterator was not created from a private connection: "
659 "%!+i", iterator);
660 BT_LIB_LOGD("Getting next private connection notification iterator's notification: %!+i",
661 iterator);
662 BT_ASSERT_PRE(iterator->state ==
663 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ACTIVE,
664 "Notification iterator's \"next\" called, but "
665 "iterator is in the wrong state: %!+i", iterator);
666 BT_ASSERT(iterator->upstream_component);
667 BT_ASSERT(iterator->upstream_component->class);
668
669 /* Pick the appropriate "next" method */
670 switch (iterator->upstream_component->class->type) {
671 case BT_COMPONENT_CLASS_TYPE_SOURCE:
672 {
673 struct bt_component_class_source *source_class =
674 container_of(iterator->upstream_component->class,
675 struct bt_component_class_source, parent);
676
677 BT_ASSERT(source_class->methods.iterator.next);
678 next_method = source_class->methods.iterator.next;
679 break;
680 }
681 case BT_COMPONENT_CLASS_TYPE_FILTER:
682 {
683 struct bt_component_class_filter *filter_class =
684 container_of(iterator->upstream_component->class,
685 struct bt_component_class_filter, parent);
686
687 BT_ASSERT(filter_class->methods.iterator.next);
688 next_method = filter_class->methods.iterator.next;
689 break;
690 }
691 default:
692 abort();
693 }
694
695 /*
696 * Call the user's "next" method to get the next notification
697 * and status.
698 */
699 BT_ASSERT(next_method);
700 BT_LOGD_STR("Calling user's \"next\" method.");
701 status = next_method(priv_iterator,
702 (void *) user_iterator->notifs->pdata,
703 NOTIF_BATCH_SIZE, user_count);
704 BT_LOGD("User method returned: status=%s",
705 bt_notification_iterator_status_string(status));
706 if (status < 0) {
707 BT_LOGW_STR("User method failed.");
708 goto end;
709 }
710
711 if (iterator->state == BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED ||
712 iterator->state == BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_FINALIZED_AND_ENDED) {
713 /*
714 * The user's "next" method, somehow, cancelled its own
715 * notification iterator. This can happen, for example,
716 * when the user's method removes the port on which
717 * there's the connection from which the iterator was
718 * created. In this case, said connection is ended, and
719 * all its notification iterators are finalized.
720 *
721 * Only bt_put() the returned notification if
722 * the status is
723 * BT_NOTIFICATION_ITERATOR_STATUS_OK because
724 * otherwise this field could be garbage.
725 */
726 if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
727 uint64_t i;
728 bt_notification_array notifs =
729 (void *) user_iterator->notifs->pdata;
730
731 for (i = 0; i < *user_count; i++) {
732 bt_put(notifs[i]);
733 }
734 }
735
736 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
737 goto end;
738 }
739
740 switch (status) {
741 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
742 BT_ASSERT_PRE(validate_notifications(iterator, *user_count),
743 "Notifications are invalid at this point: "
744 "%![notif-iter-]+i, count=%" PRIu64,
745 iterator, *user_count);
746 *user_notifs = (void *) user_iterator->notifs->pdata;
747 break;
748 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
749 status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
750 goto end;
751 case BT_NOTIFICATION_ITERATOR_STATUS_END:
752 BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator),
753 "Notification iterator cannot end at this point: "
754 "%!+i", iterator);
755 BT_ASSERT(iterator->state ==
756 BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ACTIVE);
757 iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ENDED;
758 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
759 BT_LOGD("Set new status: status=%s",
760 bt_notification_iterator_status_string(status));
761 goto end;
762 default:
763 /* Unknown non-error status */
764 abort();
765 }
766
767 end:
768 return status;
769 }
770
771 enum bt_notification_iterator_status
772 bt_output_port_notification_iterator_next(
773 struct bt_notification_iterator *iterator,
774 bt_notification_array *notifs_to_user,
775 uint64_t *count_to_user)
776 {
777 enum bt_notification_iterator_status status;
778 struct bt_notification_iterator_output_port *out_port_iter =
779 (void *) iterator;
780 enum bt_graph_status graph_status;
781
782 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
783 BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array");
784 BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count");
785 BT_ASSERT_PRE(iterator->type ==
786 BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
787 "Notification iterator was not created from an output port: "
788 "%!+i", iterator);
789 BT_LIB_LOGD("Getting next output port notification iterator's notification: %!+i",
790 iterator);
791
792 graph_status = bt_graph_consume_sink_no_check(
793 out_port_iter->graph, out_port_iter->colander);
794 switch (graph_status) {
795 case BT_GRAPH_STATUS_CANCELED:
796 status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
797 break;
798 case BT_GRAPH_STATUS_AGAIN:
799 status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
800 break;
801 case BT_GRAPH_STATUS_END:
802 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
803 break;
804 case BT_GRAPH_STATUS_NOMEM:
805 status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
806 break;
807 case BT_GRAPH_STATUS_OK:
808 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
809
810 /*
811 * On success, the colander sink moves the notifications
812 * to this iterator's array and sets this iterator's
813 * notification count: move them to the user.
814 */
815 *notifs_to_user = (void *) iterator->notifs->pdata;
816 *count_to_user = out_port_iter->count;
817 break;
818 default:
819 /* Other errors */
820 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
821 }
822
823 return status;
824 }
825
826 struct bt_component *bt_private_connection_notification_iterator_get_component(
827 struct bt_notification_iterator *iterator)
828 {
829 struct bt_notification_iterator_private_connection *iter_priv_conn;
830
831 BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
832 BT_ASSERT_PRE(iterator->type ==
833 BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
834 "Notification iterator was not created from a private connection: "
835 "%!+i", iterator);
836 iter_priv_conn = (void *) iterator;
837 return bt_get(iter_priv_conn->upstream_component);
838 }
839
840 struct bt_private_component *
841 bt_private_connection_private_notification_iterator_get_private_component(
842 struct bt_private_connection_private_notification_iterator *private_iterator)
843 {
844 return bt_private_component_from_component(
845 bt_private_connection_notification_iterator_get_component(
846 (void *) bt_private_connection_notification_iterator_borrow_from_private(private_iterator)));
847 }
848
849 static
850 void bt_output_port_notification_iterator_destroy(struct bt_object *obj)
851 {
852 struct bt_notification_iterator_output_port *iterator =
853 (void *) container_of(obj, struct bt_notification_iterator, base);
854
855 BT_LOGD("Destroying output port notification iterator object: addr=%p",
856 iterator);
857 BT_LOGD_STR("Putting graph.");
858 bt_put(iterator->graph);
859 BT_LOGD_STR("Putting colander sink component.");
860 bt_put(iterator->colander);
861 destroy_base_notification_iterator(obj);
862 }
863
864 struct bt_notification_iterator *bt_output_port_notification_iterator_create(
865 struct bt_port *output_port,
866 const char *colander_component_name)
867 {
868 struct bt_notification_iterator_output_port *iterator = NULL;
869 struct bt_component_class *colander_comp_cls = NULL;
870 struct bt_component *output_port_comp = NULL;
871 struct bt_component *colander_comp;
872 struct bt_graph *graph = NULL;
873 enum bt_graph_status graph_status;
874 const char *colander_comp_name;
875 struct bt_port *colander_in_port = NULL;
876 struct bt_component_class_sink_colander_data colander_data;
877 int ret;
878
879 BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
880 BT_ASSERT_PRE(bt_port_get_type(output_port) == BT_PORT_TYPE_OUTPUT,
881 "Port is not an output port: %!+p", output_port);
882 output_port_comp = bt_port_get_component(output_port);
883 BT_ASSERT_PRE(output_port_comp,
884 "Output port has no component: %!+p", output_port);
885 graph = bt_component_get_graph(output_port_comp);
886 BT_ASSERT(graph);
887
888 /* Create notification iterator */
889 BT_LOGD("Creating notification iterator on output port: "
890 "comp-addr=%p, comp-name\"%s\", port-addr=%p, port-name=\"%s\"",
891 output_port_comp, bt_component_get_name(output_port_comp),
892 output_port, bt_port_get_name(output_port));
893 iterator = g_new0(struct bt_notification_iterator_output_port, 1);
894 if (!iterator) {
895 BT_LOGE_STR("Failed to allocate one output port notification iterator.");
896 goto error;
897 }
898
899 ret = init_notification_iterator((void *) iterator,
900 BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
901 bt_output_port_notification_iterator_destroy);
902 if (ret) {
903 /* init_notification_iterator() logs errors */
904 BT_PUT(iterator);
905 goto end;
906 }
907
908 /* Create colander component */
909 colander_comp_cls = bt_component_class_sink_colander_get();
910 if (!colander_comp_cls) {
911 BT_LOGW("Cannot get colander sink component class.");
912 goto error;
913 }
914
915 BT_MOVE(iterator->graph, graph);
916 colander_comp_name =
917 colander_component_name ? colander_component_name : "colander";
918 colander_data.notifs = (void *) iterator->base.notifs->pdata;
919 colander_data.count_addr = &iterator->count;
920
921 graph_status = bt_graph_add_component_with_init_method_data(
922 iterator->graph, colander_comp_cls, colander_comp_name,
923 NULL, &colander_data, &iterator->colander);
924 if (graph_status != BT_GRAPH_STATUS_OK) {
925 BT_LOGW("Cannot add colander sink component to graph: "
926 "graph-addr=%p, name=\"%s\", graph-status=%s",
927 iterator->graph, colander_comp_name,
928 bt_graph_status_string(graph_status));
929 goto error;
930 }
931
932 /*
933 * Connect provided output port to the colander component's
934 * input port.
935 */
936 colander_in_port = bt_component_sink_get_input_port_by_index(
937 iterator->colander, 0);
938 BT_ASSERT(colander_in_port);
939 graph_status = bt_graph_connect_ports(iterator->graph,
940 output_port, colander_in_port, NULL);
941 if (graph_status != BT_GRAPH_STATUS_OK) {
942 BT_LOGW("Cannot add colander sink component to graph: "
943 "graph-addr=%p, name=\"%s\", graph-status=%s",
944 iterator->graph, colander_comp_name,
945 bt_graph_status_string(graph_status));
946 goto error;
947 }
948
949 /*
950 * At this point everything went fine. Make the graph
951 * nonconsumable forever so that only this notification iterator
952 * can consume (thanks to bt_graph_consume_sink_no_check()).
953 * This avoids leaking the notification created by the colander
954 * sink and moved to the notification iterator's notification
955 * member.
956 */
957 bt_graph_set_can_consume(iterator->graph, BT_FALSE);
958 goto end;
959
960 error:
961 if (iterator && iterator->graph && iterator->colander) {
962 int ret;
963
964 /* Remove created colander component from graph if any */
965 colander_comp = iterator->colander;
966 BT_PUT(iterator->colander);
967
968 /*
969 * At this point the colander component's reference
970 * count is 0 because iterator->colander was the only
971 * owner. We also know that it is not connected because
972 * this is the last operation before this function
973 * succeeds.
974 *
975 * Since we honor the preconditions here,
976 * bt_graph_remove_unconnected_component() always
977 * succeeds.
978 */
979 ret = bt_graph_remove_unconnected_component(iterator->graph,
980 colander_comp);
981 BT_ASSERT(ret == 0);
982 }
983
984 BT_PUT(iterator);
985
986 end:
987 bt_put(colander_in_port);
988 bt_put(colander_comp_cls);
989 bt_put(output_port_comp);
990 bt_put(graph);
991 return (void *) iterator;
992 }
993
994 struct bt_notification_iterator *
995 bt_private_connection_notification_iterator_borrow_from_private(
996 struct bt_private_connection_private_notification_iterator *private_notification_iterator)
997 {
998 return (void *) private_notification_iterator;
999 }
This page took 0.054706 seconds and 4 git commands to generate.