2 * Copyright 2017-2018 Philippe Proulx <pproulx@efficios.com>
3 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 #define BT_LOG_TAG "MSG-ITER"
25 #include <babeltrace/lib-logging-internal.h>
27 #include <babeltrace/compiler-internal.h>
28 #include <babeltrace/trace-ir/field.h>
29 #include <babeltrace/trace-ir/event-const.h>
30 #include <babeltrace/trace-ir/event-internal.h>
31 #include <babeltrace/trace-ir/packet-const.h>
32 #include <babeltrace/trace-ir/packet-internal.h>
33 #include <babeltrace/trace-ir/stream-internal.h>
34 #include <babeltrace/graph/connection-const.h>
35 #include <babeltrace/graph/connection-internal.h>
36 #include <babeltrace/graph/component-const.h>
37 #include <babeltrace/graph/component-internal.h>
38 #include <babeltrace/graph/component-source-internal.h>
39 #include <babeltrace/graph/component-class-internal.h>
40 #include <babeltrace/graph/component-class-sink-colander-internal.h>
41 #include <babeltrace/graph/component-sink-const.h>
42 #include <babeltrace/graph/message-const.h>
43 #include <babeltrace/graph/message-iterator.h>
44 #include <babeltrace/graph/message-iterator-internal.h>
45 #include <babeltrace/graph/self-component-port-input-message-iterator.h>
46 #include <babeltrace/graph/port-output-message-iterator.h>
47 #include <babeltrace/graph/message-internal.h>
48 #include <babeltrace/graph/message-event-const.h>
49 #include <babeltrace/graph/message-event-internal.h>
50 #include <babeltrace/graph/message-packet-const.h>
51 #include <babeltrace/graph/message-packet-internal.h>
52 #include <babeltrace/graph/message-stream-const.h>
53 #include <babeltrace/graph/message-stream-internal.h>
54 #include <babeltrace/graph/port-const.h>
55 #include <babeltrace/graph/graph.h>
56 #include <babeltrace/graph/graph-const.h>
57 #include <babeltrace/graph/graph-internal.h>
58 #include <babeltrace/types.h>
59 #include <babeltrace/assert-internal.h>
60 #include <babeltrace/assert-pre-internal.h>
66 * TODO: Use graph's state (number of active iterators, etc.) and
67 * possibly system specifications to make a better guess than this.
69 #define MSG_BATCH_SIZE 15
72 const struct bt_stream
*stream
; /* owned by this */
73 const struct bt_packet
*cur_packet
; /* owned by this */
74 uint64_t expected_msg_seq_num
;
80 void destroy_stream_state(struct stream_state
*stream_state
)
86 BT_LOGV("Destroying stream state: stream-state-addr=%p", stream_state
);
87 BT_LOGV_STR("Putting stream state's current packet.");
88 BT_OBJECT_PUT_REF_AND_RESET(stream_state
->cur_packet
);
89 BT_LOGV_STR("Putting stream state's stream.");
90 BT_OBJECT_PUT_REF_AND_RESET(stream_state
->stream
);
96 struct stream_state
*create_stream_state(const struct bt_stream
*stream
)
98 struct stream_state
*stream_state
= g_new0(struct stream_state
, 1);
101 BT_LOGE_STR("Failed to allocate one stream state.");
106 * We keep a reference to the stream until we know it's ended.
108 stream_state
->stream
= stream
;
109 bt_object_get_no_null_check(stream_state
->stream
);
110 BT_LIB_LOGV("Created stream state: %![stream-]+s, "
111 "stream-state-addr=%p",
112 stream
, stream_state
);
119 void _set_self_comp_port_input_msg_iterator_state(
120 struct bt_self_component_port_input_message_iterator
*iterator
,
121 enum bt_self_component_port_input_message_iterator_state state
)
124 BT_LIB_LOGD("Updating message iterator's state: "
126 bt_self_component_port_input_message_iterator_state_string(state
));
127 iterator
->state
= state
;
131 # define set_self_comp_port_input_msg_iterator_state _set_self_comp_port_input_msg_iterator_state
133 # define set_self_comp_port_input_msg_iterator_state(_a, _b)
137 void destroy_base_message_iterator(struct bt_object
*obj
)
139 struct bt_message_iterator
*iterator
= (void *) obj
;
143 if (iterator
->msgs
) {
144 g_ptr_array_free(iterator
->msgs
, TRUE
);
145 iterator
->msgs
= NULL
;
152 void bt_self_component_port_input_message_iterator_destroy(struct bt_object
*obj
)
154 struct bt_self_component_port_input_message_iterator
*iterator
;
159 * The message iterator's reference count is 0 if we're
160 * here. Increment it to avoid a double-destroy (possibly
161 * infinitely recursive). This could happen for example if the
162 * message iterator's finalization function does
163 * bt_object_get_ref() (or anything that causes
164 * bt_object_get_ref() to be called) on itself (ref. count goes
165 * from 0 to 1), and then bt_object_put_ref(): the reference
166 * count would go from 1 to 0 again and this function would be
170 iterator
= (void *) obj
;
171 BT_LIB_LOGD("Destroying self component input port message iterator object: "
173 bt_self_component_port_input_message_iterator_try_finalize(iterator
);
175 if (iterator
->stream_states
) {
177 * Remove our destroy listener from each stream which
178 * has a state in this iterator. Otherwise the destroy
179 * listener would be called with an invalid/other
180 * message iterator object.
182 g_hash_table_destroy(iterator
->stream_states
);
183 iterator
->stream_states
= NULL
;
186 if (iterator
->connection
) {
188 * Remove ourself from the originating connection so
189 * that it does not try to finalize a dangling pointer
192 bt_connection_remove_iterator(iterator
->connection
, iterator
);
193 iterator
->connection
= NULL
;
196 destroy_base_message_iterator(obj
);
200 void bt_self_component_port_input_message_iterator_try_finalize(
201 struct bt_self_component_port_input_message_iterator
*iterator
)
203 typedef void (*method_t
)(void *);
205 struct bt_component_class
*comp_class
= NULL
;
206 method_t method
= NULL
;
210 switch (iterator
->state
) {
211 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED
:
212 /* Skip user finalization if user initialization failed */
213 BT_LIB_LOGD("Not finalizing non-initialized message iterator: "
216 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED
:
217 /* Already finalized */
218 BT_LIB_LOGD("Not finalizing message iterator: already finalized: "
221 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING
:
222 /* Already finalized */
223 BT_LIB_LOGF("Message iterator is already being finalized: "
230 BT_LIB_LOGD("Finalizing message iterator: %!+i", iterator
);
231 set_self_comp_port_input_msg_iterator_state(iterator
,
232 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING
);
233 BT_ASSERT(iterator
->upstream_component
);
234 comp_class
= iterator
->upstream_component
->class;
236 /* Call user-defined destroy method */
237 switch (comp_class
->type
) {
238 case BT_COMPONENT_CLASS_TYPE_SOURCE
:
240 struct bt_component_class_source
*src_comp_cls
=
243 method
= (method_t
) src_comp_cls
->methods
.msg_iter_finalize
;
246 case BT_COMPONENT_CLASS_TYPE_FILTER
:
248 struct bt_component_class_filter
*flt_comp_cls
=
251 method
= (method_t
) flt_comp_cls
->methods
.msg_iter_finalize
;
260 BT_LIB_LOGD("Calling user's finalization method: %!+i",
265 iterator
->upstream_component
= NULL
;
266 iterator
->upstream_port
= NULL
;
267 set_self_comp_port_input_msg_iterator_state(iterator
,
268 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED
);
269 BT_LIB_LOGD("Finalized message iterator: %!+i", iterator
);
276 void bt_self_component_port_input_message_iterator_set_connection(
277 struct bt_self_component_port_input_message_iterator
*iterator
,
278 struct bt_connection
*connection
)
281 iterator
->connection
= connection
;
282 BT_LIB_LOGV("Set message iterator's connection: "
283 "%![iter-]+i, %![conn-]+x", iterator
, connection
);
287 int init_message_iterator(struct bt_message_iterator
*iterator
,
288 enum bt_message_iterator_type type
,
289 bt_object_release_func destroy
)
293 bt_object_init_shared(&iterator
->base
, destroy
);
294 iterator
->type
= type
;
295 iterator
->msgs
= g_ptr_array_new();
296 if (!iterator
->msgs
) {
297 BT_LOGE_STR("Failed to allocate a GPtrArray.");
302 g_ptr_array_set_size(iterator
->msgs
, MSG_BATCH_SIZE
);
309 struct bt_self_component_port_input_message_iterator
*
310 bt_self_component_port_input_message_iterator_create_initial(
311 struct bt_component
*upstream_comp
,
312 struct bt_port
*upstream_port
)
315 struct bt_self_component_port_input_message_iterator
*iterator
= NULL
;
317 BT_ASSERT(upstream_comp
);
318 BT_ASSERT(upstream_port
);
319 BT_ASSERT(bt_port_is_connected(upstream_port
));
320 BT_LIB_LOGD("Creating initial message iterator on self component input port: "
321 "%![up-comp-]+c, %![up-port-]+p", upstream_comp
, upstream_port
);
322 BT_ASSERT(bt_component_get_class_type(upstream_comp
) ==
323 BT_COMPONENT_CLASS_TYPE_SOURCE
||
324 bt_component_get_class_type(upstream_comp
) ==
325 BT_COMPONENT_CLASS_TYPE_FILTER
);
327 struct bt_self_component_port_input_message_iterator
, 1);
329 BT_LOGE_STR("Failed to allocate one self component input port "
330 "message iterator.");
334 ret
= init_message_iterator((void *) iterator
,
335 BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT
,
336 bt_self_component_port_input_message_iterator_destroy
);
338 /* init_message_iterator() logs errors */
339 BT_OBJECT_PUT_REF_AND_RESET(iterator
);
343 iterator
->stream_states
= g_hash_table_new_full(g_direct_hash
,
344 g_direct_equal
, NULL
, (GDestroyNotify
) destroy_stream_state
);
345 if (!iterator
->stream_states
) {
346 BT_LOGE_STR("Failed to allocate a GHashTable.");
347 BT_OBJECT_PUT_REF_AND_RESET(iterator
);
351 iterator
->upstream_component
= upstream_comp
;
352 iterator
->upstream_port
= upstream_port
;
353 iterator
->connection
= iterator
->upstream_port
->connection
;
354 iterator
->graph
= bt_component_borrow_graph(upstream_comp
);
355 set_self_comp_port_input_msg_iterator_state(iterator
,
356 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED
);
357 BT_LIB_LOGD("Created initial message iterator on self component input port: "
358 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
359 upstream_port
, upstream_comp
, iterator
);
365 struct bt_self_component_port_input_message_iterator
*
366 bt_self_component_port_input_message_iterator_create(
367 struct bt_self_component_port_input
*self_port
)
369 typedef enum bt_self_message_iterator_status (*init_method_t
)(
370 void *, void *, void *);
372 init_method_t init_method
= NULL
;
373 struct bt_self_component_port_input_message_iterator
*iterator
=
375 struct bt_port
*port
= (void *) self_port
;
376 struct bt_port
*upstream_port
;
377 struct bt_component
*comp
;
378 struct bt_component
*upstream_comp
;
379 struct bt_component_class
*upstream_comp_cls
;
381 BT_ASSERT_PRE_NON_NULL(port
, "Port");
382 comp
= bt_port_borrow_component_inline(port
);
383 BT_ASSERT_PRE(bt_port_is_connected(port
),
384 "Port is not connected: %![port-]+p", port
);
385 BT_ASSERT_PRE(comp
, "Port is not part of a component: %![port-]+p",
387 BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp
),
388 "Port's component's graph is canceled: "
389 "%![port-]+p, %![comp-]+c", port
, comp
);
390 BT_ASSERT(port
->connection
);
391 upstream_port
= port
->connection
->upstream_port
;
392 BT_ASSERT(upstream_port
);
393 upstream_comp
= bt_port_borrow_component_inline(upstream_port
);
394 BT_ASSERT(upstream_comp
);
395 upstream_comp_cls
= upstream_comp
->class;
396 BT_ASSERT(upstream_comp
->class->type
==
397 BT_COMPONENT_CLASS_TYPE_SOURCE
||
398 upstream_comp
->class->type
==
399 BT_COMPONENT_CLASS_TYPE_FILTER
);
400 iterator
= bt_self_component_port_input_message_iterator_create_initial(
401 upstream_comp
, upstream_port
);
403 BT_LOGW_STR("Cannot create self component input port "
404 "message iterator.");
408 switch (upstream_comp_cls
->type
) {
409 case BT_COMPONENT_CLASS_TYPE_SOURCE
:
411 struct bt_component_class_source
*src_comp_cls
=
412 (void *) upstream_comp_cls
;
415 (init_method_t
) src_comp_cls
->methods
.msg_iter_init
;
418 case BT_COMPONENT_CLASS_TYPE_FILTER
:
420 struct bt_component_class_filter
*flt_comp_cls
=
421 (void *) upstream_comp_cls
;
424 (init_method_t
) flt_comp_cls
->methods
.msg_iter_init
;
435 BT_LIB_LOGD("Calling user's initialization method: %!+i", iterator
);
436 iter_status
= init_method(iterator
, upstream_comp
,
438 BT_LOGD("User method returned: status=%s",
439 bt_message_iterator_status_string(iter_status
));
440 if (iter_status
!= BT_MESSAGE_ITERATOR_STATUS_OK
) {
441 BT_LOGW_STR("Initialization method failed.");
442 BT_OBJECT_PUT_REF_AND_RESET(iterator
);
447 set_self_comp_port_input_msg_iterator_state(iterator
,
448 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE
);
449 g_ptr_array_add(port
->connection
->iterators
, iterator
);
450 BT_LIB_LOGD("Created message iterator on self component input port: "
451 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
452 upstream_port
, upstream_comp
, iterator
);
458 void *bt_self_message_iterator_get_data(
459 const struct bt_self_message_iterator
*self_iterator
)
461 struct bt_self_component_port_input_message_iterator
*iterator
=
462 (void *) self_iterator
;
464 BT_ASSERT_PRE_NON_NULL(iterator
, "Message iterator");
465 return iterator
->user_data
;
468 void bt_self_message_iterator_set_data(
469 struct bt_self_message_iterator
*self_iterator
, void *data
)
471 struct bt_self_component_port_input_message_iterator
*iterator
=
472 (void *) self_iterator
;
474 BT_ASSERT_PRE_NON_NULL(iterator
, "Message iterator");
475 iterator
->user_data
= data
;
476 BT_LIB_LOGV("Set message iterator's user data: "
477 "%!+i, user-data-addr=%p", iterator
, data
);
482 void bt_message_borrow_packet_stream(const struct bt_message
*msg
,
483 const struct bt_stream
**stream
,
484 const struct bt_packet
**packet
)
489 case BT_MESSAGE_TYPE_EVENT
:
490 *packet
= bt_event_borrow_packet_const(
491 bt_message_event_borrow_event_const(msg
));
492 *stream
= bt_packet_borrow_stream_const(*packet
);
494 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
495 *stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
497 case BT_MESSAGE_TYPE_STREAM_END
:
498 *stream
= bt_message_stream_end_borrow_stream_const(msg
);
500 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
501 *packet
= bt_message_packet_beginning_borrow_packet_const(msg
);
502 *stream
= bt_packet_borrow_stream_const(*packet
);
504 case BT_MESSAGE_TYPE_PACKET_END
:
505 *packet
= bt_message_packet_end_borrow_packet_const(msg
);
506 *stream
= bt_packet_borrow_stream_const(*packet
);
515 bool validate_message(
516 struct bt_self_component_port_input_message_iterator
*iterator
,
517 const struct bt_message
*c_msg
)
519 bool is_valid
= true;
520 struct stream_state
*stream_state
;
521 const struct bt_stream
*stream
= NULL
;
522 const struct bt_packet
*packet
= NULL
;
523 struct bt_message
*msg
= (void *) c_msg
;
526 bt_message_borrow_packet_stream(c_msg
, &stream
, &packet
);
529 /* we don't care about messages not attached to streams */
533 stream_state
= g_hash_table_lookup(iterator
->stream_states
, stream
);
536 * No stream state for this stream: this message
537 * MUST be a BT_MESSAGE_TYPE_STREAM_BEGINNING message
538 * and its sequence number must be 0.
540 if (c_msg
->type
!= BT_MESSAGE_TYPE_STREAM_BEGINNING
) {
541 BT_ASSERT_PRE_MSG("Unexpected message: missing a "
542 "BT_MESSAGE_TYPE_STREAM_BEGINNING "
543 "message prior to this message: "
544 "%![stream-]+s", stream
);
549 if (c_msg
->seq_num
== -1ULL) {
553 if (c_msg
->seq_num
!= 0) {
554 BT_ASSERT_PRE_MSG("Unexpected message sequence "
555 "number for this message iterator: "
556 "this is the first message for this "
557 "stream, expecting sequence number 0: "
558 "seq-num=%" PRIu64
", %![stream-]+s",
559 c_msg
->seq_num
, stream
);
564 stream_state
= create_stream_state(stream
);
569 g_hash_table_insert(iterator
->stream_states
,
570 (void *) stream
, stream_state
);
571 stream_state
->expected_msg_seq_num
++;
575 if (stream_state
->is_ended
) {
577 * There's a new message which has a reference to a
578 * stream which, from this iterator's point of view, is
579 * ended ("end of stream" message was returned).
580 * This is bad: the API guarantees that it can never
583 BT_ASSERT_PRE_MSG("Stream is already ended: %![stream-]+s",
589 if (c_msg
->seq_num
== -1ULL) {
590 msg
->seq_num
= stream_state
->expected_msg_seq_num
;
593 if (c_msg
->seq_num
!= -1ULL &&
594 c_msg
->seq_num
!= stream_state
->expected_msg_seq_num
) {
595 BT_ASSERT_PRE_MSG("Unexpected message sequence number: "
596 "seq-num=%" PRIu64
", "
597 "expected-seq-num=%" PRIu64
", %![stream-]+s",
598 c_msg
->seq_num
, stream_state
->expected_msg_seq_num
,
604 switch (c_msg
->type
) {
605 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
606 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_STREAM_BEGINNING "
607 "message at this point: msg-seq-num=%" PRIu64
", "
608 "%![stream-]+s", c_msg
->seq_num
, stream
);
611 case BT_MESSAGE_TYPE_STREAM_END
:
612 if (stream_state
->cur_packet
) {
613 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_STREAM_END "
614 "message: missing a "
615 "BT_MESSAGE_TYPE_PACKET_END message "
616 "prior to this message: "
617 "msg-seq-num=%" PRIu64
", "
618 "%![stream-]+s", c_msg
->seq_num
, stream
);
622 stream_state
->expected_msg_seq_num
++;
623 stream_state
->is_ended
= true;
625 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
626 if (stream_state
->cur_packet
) {
627 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_PACKET_BEGINNING "
628 "message at this point: missing a "
629 "BT_MESSAGE_TYPE_PACKET_END message "
630 "prior to this message: "
631 "msg-seq-num=%" PRIu64
", %![stream-]+s, "
632 "%![packet-]+a", c_msg
->seq_num
, stream
,
637 stream_state
->expected_msg_seq_num
++;
638 stream_state
->cur_packet
= packet
;
639 bt_object_get_no_null_check(stream_state
->cur_packet
);
641 case BT_MESSAGE_TYPE_PACKET_END
:
642 if (!stream_state
->cur_packet
) {
643 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_PACKET_END "
644 "message at this point: missing a "
645 "BT_MESSAGE_TYPE_PACKET_BEGINNING message "
646 "prior to this message: "
647 "msg-seq-num=%" PRIu64
", %![stream-]+s, "
648 "%![packet-]+a", c_msg
->seq_num
, stream
,
653 stream_state
->expected_msg_seq_num
++;
654 BT_OBJECT_PUT_REF_AND_RESET(stream_state
->cur_packet
);
656 case BT_MESSAGE_TYPE_EVENT
:
657 if (packet
!= stream_state
->cur_packet
) {
658 BT_ASSERT_PRE_MSG("Unexpected packet for "
659 "BT_MESSAGE_TYPE_EVENT message: "
660 "msg-seq-num=%" PRIu64
", %![stream-]+s, "
661 "%![msg-packet-]+a, %![expected-packet-]+a",
662 c_msg
->seq_num
, stream
,
663 stream_state
->cur_packet
, packet
);
667 stream_state
->expected_msg_seq_num
++;
679 bool validate_messages(
680 struct bt_self_component_port_input_message_iterator
*iterator
,
684 bt_message_array_const msgs
=
685 (void *) iterator
->base
.msgs
->pdata
;
688 for (i
= 0; i
< count
; i
++) {
689 ret
= validate_message(iterator
, msgs
[i
]);
699 static inline bool self_comp_port_input_msg_iter_can_end(
700 struct bt_self_component_port_input_message_iterator
*iterator
)
703 gpointer stream_key
, state_value
;
707 * Verify that this iterator received a
708 * BT_MESSAGE_TYPE_STREAM_END message for each stream
712 g_hash_table_iter_init(&iter
, iterator
->stream_states
);
714 while (g_hash_table_iter_next(&iter
, &stream_key
, &state_value
)) {
715 struct stream_state
*stream_state
= (void *) state_value
;
717 BT_ASSERT(stream_state
);
718 BT_ASSERT(stream_key
);
720 if (!stream_state
->is_ended
) {
721 BT_ASSERT_PRE_MSG("Ending message iterator, "
722 "but stream is not ended: "
723 "%![stream-]s", stream_key
);
733 enum bt_message_iterator_status
734 bt_self_component_port_input_message_iterator_next(
735 struct bt_self_component_port_input_message_iterator
*iterator
,
736 bt_message_array_const
*msgs
, uint64_t *user_count
)
738 typedef enum bt_self_message_iterator_status (*method_t
)(
739 void *, bt_message_array_const
, uint64_t, uint64_t *);
741 method_t method
= NULL
;
742 struct bt_component_class
*comp_cls
;
743 int status
= BT_MESSAGE_ITERATOR_STATUS_OK
;
745 BT_ASSERT_PRE_NON_NULL(iterator
, "Message iterator");
746 BT_ASSERT_PRE_NON_NULL(msgs
, "Message array (output)");
747 BT_ASSERT_PRE_NON_NULL(user_count
, "Message count (output)");
748 BT_ASSERT_PRE(iterator
->state
==
749 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE
,
750 "Message iterator's \"next\" called, but "
751 "iterator is in the wrong state: %!+i", iterator
);
752 BT_ASSERT(iterator
->upstream_component
);
753 BT_ASSERT(iterator
->upstream_component
->class);
754 BT_LIB_LOGD("Getting next self component input port "
755 "message iterator's messages: %!+i", iterator
);
756 comp_cls
= iterator
->upstream_component
->class;
758 /* Pick the appropriate "next" method */
759 switch (comp_cls
->type
) {
760 case BT_COMPONENT_CLASS_TYPE_SOURCE
:
762 struct bt_component_class_source
*src_comp_cls
=
765 method
= (method_t
) src_comp_cls
->methods
.msg_iter_next
;
768 case BT_COMPONENT_CLASS_TYPE_FILTER
:
770 struct bt_component_class_filter
*flt_comp_cls
=
773 method
= (method_t
) flt_comp_cls
->methods
.msg_iter_next
;
781 * Call the user's "next" method to get the next messages
785 BT_LOGD_STR("Calling user's \"next\" method.");
786 status
= method(iterator
, (void *) iterator
->base
.msgs
->pdata
,
787 MSG_BATCH_SIZE
, user_count
);
788 BT_LOGD("User method returned: status=%s",
789 bt_message_iterator_status_string(status
));
791 BT_LOGW_STR("User method failed.");
797 * There is no way that this iterator could have been finalized
798 * during its "next" method, as the only way to do this is to
799 * put the last iterator's reference, and this can only be done
800 * by its downstream owner.
802 BT_ASSERT(iterator
->state
==
803 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE
);
807 case BT_MESSAGE_ITERATOR_STATUS_OK
:
808 BT_ASSERT_PRE(validate_messages(iterator
, *user_count
),
809 "Messages are invalid at this point: "
810 "%![msg-iter-]+i, count=%" PRIu64
,
811 iterator
, *user_count
);
812 *msgs
= (void *) iterator
->base
.msgs
->pdata
;
814 case BT_MESSAGE_ITERATOR_STATUS_AGAIN
:
816 case BT_MESSAGE_ITERATOR_STATUS_END
:
817 BT_ASSERT_PRE(self_comp_port_input_msg_iter_can_end(iterator
),
818 "Message iterator cannot end at this point: "
820 set_self_comp_port_input_msg_iterator_state(iterator
,
821 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED
);
824 /* Unknown non-error status */
832 enum bt_message_iterator_status
bt_port_output_message_iterator_next(
833 struct bt_port_output_message_iterator
*iterator
,
834 bt_message_array_const
*msgs_to_user
,
835 uint64_t *count_to_user
)
837 enum bt_message_iterator_status status
;
838 enum bt_graph_status graph_status
;
840 BT_ASSERT_PRE_NON_NULL(iterator
, "Message iterator");
841 BT_ASSERT_PRE_NON_NULL(msgs_to_user
, "Message array (output)");
842 BT_ASSERT_PRE_NON_NULL(count_to_user
, "Message count (output)");
843 BT_LIB_LOGD("Getting next output port message iterator's messages: "
846 graph_status
= bt_graph_consume_sink_no_check(iterator
->graph
,
848 switch (graph_status
) {
849 case BT_GRAPH_STATUS_CANCELED
:
850 case BT_GRAPH_STATUS_AGAIN
:
851 case BT_GRAPH_STATUS_END
:
852 case BT_GRAPH_STATUS_NOMEM
:
853 status
= (int) graph_status
;
855 case BT_GRAPH_STATUS_OK
:
856 status
= BT_MESSAGE_ITERATOR_STATUS_OK
;
859 * On success, the colander sink moves the messages
860 * to this iterator's array and sets this iterator's
861 * message count: move them to the user.
863 *msgs_to_user
= (void *) iterator
->base
.msgs
->pdata
;
864 *count_to_user
= iterator
->count
;
868 status
= BT_MESSAGE_ITERATOR_STATUS_ERROR
;
874 struct bt_component
*bt_self_component_port_input_message_iterator_borrow_component(
875 struct bt_self_component_port_input_message_iterator
*iterator
)
877 BT_ASSERT_PRE_NON_NULL(iterator
, "Message iterator");
878 return iterator
->upstream_component
;
881 struct bt_self_component
*bt_self_message_iterator_borrow_component(
882 struct bt_self_message_iterator
*self_iterator
)
884 struct bt_self_component_port_input_message_iterator
*iterator
=
885 (void *) self_iterator
;
887 BT_ASSERT_PRE_NON_NULL(iterator
, "Message iterator");
888 return (void *) iterator
->upstream_component
;
891 struct bt_self_port_output
*bt_self_message_iterator_borrow_port(
892 struct bt_self_message_iterator
*self_iterator
)
894 struct bt_self_component_port_input_message_iterator
*iterator
=
895 (void *) self_iterator
;
897 BT_ASSERT_PRE_NON_NULL(iterator
, "Message iterator");
898 return (void *) iterator
->upstream_port
;
902 void bt_port_output_message_iterator_destroy(struct bt_object
*obj
)
904 struct bt_port_output_message_iterator
*iterator
= (void *) obj
;
906 BT_LIB_LOGD("Destroying output port message iterator object: %!+i",
908 BT_LOGD_STR("Putting graph.");
909 BT_OBJECT_PUT_REF_AND_RESET(iterator
->graph
);
910 BT_LOGD_STR("Putting colander sink component.");
911 BT_OBJECT_PUT_REF_AND_RESET(iterator
->colander
);
912 destroy_base_message_iterator(obj
);
915 struct bt_port_output_message_iterator
*
916 bt_port_output_message_iterator_create(
917 struct bt_graph
*graph
,
918 const struct bt_port_output
*output_port
)
920 struct bt_port_output_message_iterator
*iterator
= NULL
;
921 struct bt_component_class_sink
*colander_comp_cls
= NULL
;
922 struct bt_component
*output_port_comp
= NULL
;
923 struct bt_component_sink
*colander_comp
;
924 enum bt_graph_status graph_status
;
925 struct bt_port_input
*colander_in_port
= NULL
;
926 struct bt_component_class_sink_colander_data colander_data
;
929 BT_ASSERT_PRE_NON_NULL(graph
, "Graph");
930 BT_ASSERT_PRE_NON_NULL(output_port
, "Output port");
931 output_port_comp
= bt_port_borrow_component_inline(
932 (const void *) output_port
);
933 BT_ASSERT_PRE(output_port_comp
,
934 "Output port has no component: %!+p", output_port
);
935 BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp
) ==
937 "Output port is not part of graph: %![graph-]+g, %![port-]+p",
940 /* Create message iterator */
941 BT_LIB_LOGD("Creating message iterator on output port: "
942 "%![port-]+p, %![comp-]+c", output_port
, output_port_comp
);
943 iterator
= g_new0(struct bt_port_output_message_iterator
, 1);
945 BT_LOGE_STR("Failed to allocate one output port message iterator.");
949 ret
= init_message_iterator((void *) iterator
,
950 BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT
,
951 bt_port_output_message_iterator_destroy
);
953 /* init_message_iterator() logs errors */
954 BT_OBJECT_PUT_REF_AND_RESET(iterator
);
958 /* Create colander component */
959 colander_comp_cls
= bt_component_class_sink_colander_get();
960 if (!colander_comp_cls
) {
961 BT_LOGW("Cannot get colander sink component class.");
965 iterator
->graph
= graph
;
966 bt_object_get_no_null_check(iterator
->graph
);
967 colander_data
.msgs
= (void *) iterator
->base
.msgs
->pdata
;
968 colander_data
.count_addr
= &iterator
->count
;
970 /* Hope that nobody uses this very unique name */
972 bt_graph_add_sink_component_with_init_method_data(
973 (void *) graph
, colander_comp_cls
,
974 "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1",
975 NULL
, &colander_data
, (void *) &iterator
->colander
);
976 if (graph_status
!= BT_GRAPH_STATUS_OK
) {
977 BT_LIB_LOGW("Cannot add colander sink component to graph: "
978 "%1[graph-]+g, status=%s", graph
,
979 bt_graph_status_string(graph_status
));
984 * Connect provided output port to the colander component's
988 (void *) bt_component_sink_borrow_input_port_by_index_const(
989 (void *) iterator
->colander
, 0);
990 BT_ASSERT(colander_in_port
);
991 graph_status
= bt_graph_connect_ports(graph
,
992 output_port
, colander_in_port
, NULL
);
993 if (graph_status
!= BT_GRAPH_STATUS_OK
) {
994 BT_LIB_LOGW("Cannot add colander sink component to graph: "
995 "%![graph-]+g, %![comp-]+c, status=%s", graph
,
997 bt_graph_status_string(graph_status
));
1002 * At this point everything went fine. Make the graph
1003 * nonconsumable forever so that only this message iterator
1004 * can consume (thanks to bt_graph_consume_sink_no_check()).
1005 * This avoids leaking the message created by the colander
1006 * sink and moved to the message iterator's message
1009 bt_graph_set_can_consume(iterator
->graph
, false);
1013 if (iterator
&& iterator
->graph
&& iterator
->colander
) {
1016 /* Remove created colander component from graph if any */
1017 colander_comp
= iterator
->colander
;
1018 BT_OBJECT_PUT_REF_AND_RESET(iterator
->colander
);
1021 * At this point the colander component's reference
1022 * count is 0 because iterator->colander was the only
1023 * owner. We also know that it is not connected because
1024 * this is the last operation before this function
1027 * Since we honor the preconditions here,
1028 * bt_graph_remove_unconnected_component() always
1031 ret
= bt_graph_remove_unconnected_component(iterator
->graph
,
1032 (void *) colander_comp
);
1033 BT_ASSERT(ret
== 0);
1036 BT_OBJECT_PUT_REF_AND_RESET(iterator
);
1039 bt_object_put_ref(colander_comp_cls
);
1040 return (void *) iterator
;
1043 void bt_port_output_message_iterator_get_ref(
1044 const struct bt_port_output_message_iterator
*iterator
)
1046 bt_object_get_ref(iterator
);
1049 void bt_port_output_message_iterator_put_ref(
1050 const struct bt_port_output_message_iterator
*iterator
)
1052 bt_object_put_ref(iterator
);
1055 void bt_self_component_port_input_message_iterator_get_ref(
1056 const struct bt_self_component_port_input_message_iterator
*iterator
)
1058 bt_object_get_ref(iterator
);
1061 void bt_self_component_port_input_message_iterator_put_ref(
1062 const struct bt_self_component_port_input_message_iterator
*iterator
)
1064 bt_object_put_ref(iterator
);