lib: fully configure graph (add components, connect ports), then run
[babeltrace.git] / lib / graph / iterator.c
1 /*
2 * Copyright 2017-2018 Philippe Proulx <pproulx@efficios.com>
3 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_TAG "MSG-ITER"
25 #include <babeltrace/lib-logging-internal.h>
26
27 #include <babeltrace/compiler-internal.h>
28 #include <babeltrace/trace-ir/field.h>
29 #include <babeltrace/trace-ir/event-const.h>
30 #include <babeltrace/trace-ir/event-internal.h>
31 #include <babeltrace/trace-ir/packet-const.h>
32 #include <babeltrace/trace-ir/packet-internal.h>
33 #include <babeltrace/trace-ir/stream-internal.h>
34 #include <babeltrace/graph/connection-const.h>
35 #include <babeltrace/graph/connection-internal.h>
36 #include <babeltrace/graph/component-const.h>
37 #include <babeltrace/graph/component-internal.h>
38 #include <babeltrace/graph/component-source-internal.h>
39 #include <babeltrace/graph/component-class-internal.h>
40 #include <babeltrace/graph/component-class-sink-colander-internal.h>
41 #include <babeltrace/graph/component-sink-const.h>
42 #include <babeltrace/graph/message-const.h>
43 #include <babeltrace/graph/message-iterator.h>
44 #include <babeltrace/graph/message-iterator-internal.h>
45 #include <babeltrace/graph/self-component-port-input-message-iterator.h>
46 #include <babeltrace/graph/port-output-message-iterator.h>
47 #include <babeltrace/graph/message-internal.h>
48 #include <babeltrace/graph/message-event-const.h>
49 #include <babeltrace/graph/message-event-internal.h>
50 #include <babeltrace/graph/message-packet-const.h>
51 #include <babeltrace/graph/message-packet-internal.h>
52 #include <babeltrace/graph/message-stream-const.h>
53 #include <babeltrace/graph/message-stream-internal.h>
54 #include <babeltrace/graph/port-const.h>
55 #include <babeltrace/graph/graph.h>
56 #include <babeltrace/graph/graph-const.h>
57 #include <babeltrace/graph/graph-internal.h>
58 #include <babeltrace/types.h>
59 #include <babeltrace/assert-internal.h>
60 #include <babeltrace/assert-pre-internal.h>
61 #include <stdint.h>
62 #include <inttypes.h>
63 #include <stdlib.h>
64
65 /*
66 * TODO: Use graph's state (number of active iterators, etc.) and
67 * possibly system specifications to make a better guess than this.
68 */
69 #define MSG_BATCH_SIZE 15
70
71 struct stream_state {
72 const struct bt_stream *stream; /* owned by this */
73 const struct bt_packet *cur_packet; /* owned by this */
74 uint64_t expected_msg_seq_num;
75 bt_bool is_ended;
76 };
77
78 BT_ASSERT_PRE_FUNC
79 static
80 void destroy_stream_state(struct stream_state *stream_state)
81 {
82 if (!stream_state) {
83 return;
84 }
85
86 BT_LOGV("Destroying stream state: stream-state-addr=%p", stream_state);
87 BT_LOGV_STR("Putting stream state's current packet.");
88 BT_OBJECT_PUT_REF_AND_RESET(stream_state->cur_packet);
89 BT_LOGV_STR("Putting stream state's stream.");
90 BT_OBJECT_PUT_REF_AND_RESET(stream_state->stream);
91 g_free(stream_state);
92 }
93
94 BT_ASSERT_PRE_FUNC
95 static
96 struct stream_state *create_stream_state(const struct bt_stream *stream)
97 {
98 struct stream_state *stream_state = g_new0(struct stream_state, 1);
99
100 if (!stream_state) {
101 BT_LOGE_STR("Failed to allocate one stream state.");
102 goto end;
103 }
104
105 /*
106 * We keep a reference to the stream until we know it's ended.
107 */
108 stream_state->stream = stream;
109 bt_object_get_no_null_check(stream_state->stream);
110 BT_LIB_LOGV("Created stream state: %![stream-]+s, "
111 "stream-state-addr=%p",
112 stream, stream_state);
113
114 end:
115 return stream_state;
116 }
117
118 static inline
119 void _set_self_comp_port_input_msg_iterator_state(
120 struct bt_self_component_port_input_message_iterator *iterator,
121 enum bt_self_component_port_input_message_iterator_state state)
122 {
123 BT_ASSERT(iterator);
124 BT_LIB_LOGD("Updating message iterator's state: "
125 "new-state=%s",
126 bt_self_component_port_input_message_iterator_state_string(state));
127 iterator->state = state;
128 }
129
130 #ifdef BT_DEV_MODE
131 # define set_self_comp_port_input_msg_iterator_state _set_self_comp_port_input_msg_iterator_state
132 #else
133 # define set_self_comp_port_input_msg_iterator_state(_a, _b)
134 #endif
135
136 static
137 void destroy_base_message_iterator(struct bt_object *obj)
138 {
139 struct bt_message_iterator *iterator = (void *) obj;
140
141 BT_ASSERT(iterator);
142
143 if (iterator->msgs) {
144 g_ptr_array_free(iterator->msgs, TRUE);
145 iterator->msgs = NULL;
146 }
147
148 g_free(iterator);
149 }
150
151 static
152 void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj)
153 {
154 struct bt_self_component_port_input_message_iterator *iterator;
155
156 BT_ASSERT(obj);
157
158 /*
159 * The message iterator's reference count is 0 if we're
160 * here. Increment it to avoid a double-destroy (possibly
161 * infinitely recursive). This could happen for example if the
162 * message iterator's finalization function does
163 * bt_object_get_ref() (or anything that causes
164 * bt_object_get_ref() to be called) on itself (ref. count goes
165 * from 0 to 1), and then bt_object_put_ref(): the reference
166 * count would go from 1 to 0 again and this function would be
167 * called again.
168 */
169 obj->ref_count++;
170 iterator = (void *) obj;
171 BT_LIB_LOGD("Destroying self component input port message iterator object: "
172 "%!+i", iterator);
173 bt_self_component_port_input_message_iterator_try_finalize(iterator);
174
175 if (iterator->stream_states) {
176 /*
177 * Remove our destroy listener from each stream which
178 * has a state in this iterator. Otherwise the destroy
179 * listener would be called with an invalid/other
180 * message iterator object.
181 */
182 g_hash_table_destroy(iterator->stream_states);
183 iterator->stream_states = NULL;
184 }
185
186 if (iterator->connection) {
187 /*
188 * Remove ourself from the originating connection so
189 * that it does not try to finalize a dangling pointer
190 * later.
191 */
192 bt_connection_remove_iterator(iterator->connection, iterator);
193 iterator->connection = NULL;
194 }
195
196 destroy_base_message_iterator(obj);
197 }
198
199 BT_HIDDEN
200 void bt_self_component_port_input_message_iterator_try_finalize(
201 struct bt_self_component_port_input_message_iterator *iterator)
202 {
203 typedef void (*method_t)(void *);
204
205 struct bt_component_class *comp_class = NULL;
206 method_t method = NULL;
207
208 BT_ASSERT(iterator);
209
210 switch (iterator->state) {
211 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED:
212 /* Skip user finalization if user initialization failed */
213 BT_LIB_LOGD("Not finalizing non-initialized message iterator: "
214 "%!+i", iterator);
215 goto end;
216 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED:
217 /* Already finalized */
218 BT_LIB_LOGD("Not finalizing message iterator: already finalized: "
219 "%!+i", iterator);
220 goto end;
221 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING:
222 /* Already finalized */
223 BT_LIB_LOGF("Message iterator is already being finalized: "
224 "%!+i", iterator);
225 abort();
226 default:
227 break;
228 }
229
230 BT_LIB_LOGD("Finalizing message iterator: %!+i", iterator);
231 set_self_comp_port_input_msg_iterator_state(iterator,
232 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING);
233 BT_ASSERT(iterator->upstream_component);
234 comp_class = iterator->upstream_component->class;
235
236 /* Call user-defined destroy method */
237 switch (comp_class->type) {
238 case BT_COMPONENT_CLASS_TYPE_SOURCE:
239 {
240 struct bt_component_class_source *src_comp_cls =
241 (void *) comp_class;
242
243 method = (method_t) src_comp_cls->methods.msg_iter_finalize;
244 break;
245 }
246 case BT_COMPONENT_CLASS_TYPE_FILTER:
247 {
248 struct bt_component_class_filter *flt_comp_cls =
249 (void *) comp_class;
250
251 method = (method_t) flt_comp_cls->methods.msg_iter_finalize;
252 break;
253 }
254 default:
255 /* Unreachable */
256 abort();
257 }
258
259 if (method) {
260 BT_LIB_LOGD("Calling user's finalization method: %!+i",
261 iterator);
262 method(iterator);
263 }
264
265 iterator->upstream_component = NULL;
266 iterator->upstream_port = NULL;
267 set_self_comp_port_input_msg_iterator_state(iterator,
268 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED);
269 BT_LIB_LOGD("Finalized message iterator: %!+i", iterator);
270
271 end:
272 return;
273 }
274
275 BT_HIDDEN
276 void bt_self_component_port_input_message_iterator_set_connection(
277 struct bt_self_component_port_input_message_iterator *iterator,
278 struct bt_connection *connection)
279 {
280 BT_ASSERT(iterator);
281 iterator->connection = connection;
282 BT_LIB_LOGV("Set message iterator's connection: "
283 "%![iter-]+i, %![conn-]+x", iterator, connection);
284 }
285
286 static
287 int init_message_iterator(struct bt_message_iterator *iterator,
288 enum bt_message_iterator_type type,
289 bt_object_release_func destroy)
290 {
291 int ret = 0;
292
293 bt_object_init_shared(&iterator->base, destroy);
294 iterator->type = type;
295 iterator->msgs = g_ptr_array_new();
296 if (!iterator->msgs) {
297 BT_LOGE_STR("Failed to allocate a GPtrArray.");
298 ret = -1;
299 goto end;
300 }
301
302 g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE);
303
304 end:
305 return ret;
306 }
307
308 static
309 struct bt_self_component_port_input_message_iterator *
310 bt_self_component_port_input_message_iterator_create_initial(
311 struct bt_component *upstream_comp,
312 struct bt_port *upstream_port)
313 {
314 int ret;
315 struct bt_self_component_port_input_message_iterator *iterator = NULL;
316
317 BT_ASSERT(upstream_comp);
318 BT_ASSERT(upstream_port);
319 BT_ASSERT(bt_port_is_connected(upstream_port));
320 BT_LIB_LOGD("Creating initial message iterator on self component input port: "
321 "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port);
322 BT_ASSERT(bt_component_get_class_type(upstream_comp) ==
323 BT_COMPONENT_CLASS_TYPE_SOURCE ||
324 bt_component_get_class_type(upstream_comp) ==
325 BT_COMPONENT_CLASS_TYPE_FILTER);
326 iterator = g_new0(
327 struct bt_self_component_port_input_message_iterator, 1);
328 if (!iterator) {
329 BT_LOGE_STR("Failed to allocate one self component input port "
330 "message iterator.");
331 goto end;
332 }
333
334 ret = init_message_iterator((void *) iterator,
335 BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT,
336 bt_self_component_port_input_message_iterator_destroy);
337 if (ret) {
338 /* init_message_iterator() logs errors */
339 BT_OBJECT_PUT_REF_AND_RESET(iterator);
340 goto end;
341 }
342
343 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
344 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
345 if (!iterator->stream_states) {
346 BT_LOGE_STR("Failed to allocate a GHashTable.");
347 BT_OBJECT_PUT_REF_AND_RESET(iterator);
348 goto end;
349 }
350
351 iterator->upstream_component = upstream_comp;
352 iterator->upstream_port = upstream_port;
353 iterator->connection = iterator->upstream_port->connection;
354 iterator->graph = bt_component_borrow_graph(upstream_comp);
355 set_self_comp_port_input_msg_iterator_state(iterator,
356 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED);
357 BT_LIB_LOGD("Created initial message iterator on self component input port: "
358 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
359 upstream_port, upstream_comp, iterator);
360
361 end:
362 return iterator;
363 }
364
365 struct bt_self_component_port_input_message_iterator *
366 bt_self_component_port_input_message_iterator_create(
367 struct bt_self_component_port_input *self_port)
368 {
369 typedef enum bt_self_message_iterator_status (*init_method_t)(
370 void *, void *, void *);
371
372 init_method_t init_method = NULL;
373 struct bt_self_component_port_input_message_iterator *iterator =
374 NULL;
375 struct bt_port *port = (void *) self_port;
376 struct bt_port *upstream_port;
377 struct bt_component *comp;
378 struct bt_component *upstream_comp;
379 struct bt_component_class *upstream_comp_cls;
380
381 BT_ASSERT_PRE_NON_NULL(port, "Port");
382 comp = bt_port_borrow_component_inline(port);
383 BT_ASSERT_PRE(bt_port_is_connected(port),
384 "Port is not connected: %![port-]+p", port);
385 BT_ASSERT_PRE(comp, "Port is not part of a component: %![port-]+p",
386 port);
387 BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp),
388 "Port's component's graph is canceled: "
389 "%![port-]+p, %![comp-]+c", port, comp);
390 BT_ASSERT(port->connection);
391 upstream_port = port->connection->upstream_port;
392 BT_ASSERT(upstream_port);
393 upstream_comp = bt_port_borrow_component_inline(upstream_port);
394 BT_ASSERT(upstream_comp);
395 upstream_comp_cls = upstream_comp->class;
396 BT_ASSERT(upstream_comp->class->type ==
397 BT_COMPONENT_CLASS_TYPE_SOURCE ||
398 upstream_comp->class->type ==
399 BT_COMPONENT_CLASS_TYPE_FILTER);
400 iterator = bt_self_component_port_input_message_iterator_create_initial(
401 upstream_comp, upstream_port);
402 if (!iterator) {
403 BT_LOGW_STR("Cannot create self component input port "
404 "message iterator.");
405 goto end;
406 }
407
408 switch (upstream_comp_cls->type) {
409 case BT_COMPONENT_CLASS_TYPE_SOURCE:
410 {
411 struct bt_component_class_source *src_comp_cls =
412 (void *) upstream_comp_cls;
413
414 init_method =
415 (init_method_t) src_comp_cls->methods.msg_iter_init;
416 break;
417 }
418 case BT_COMPONENT_CLASS_TYPE_FILTER:
419 {
420 struct bt_component_class_filter *flt_comp_cls =
421 (void *) upstream_comp_cls;
422
423 init_method =
424 (init_method_t) flt_comp_cls->methods.msg_iter_init;
425 break;
426 }
427 default:
428 /* Unreachable */
429 abort();
430 }
431
432 if (init_method) {
433 int iter_status;
434
435 BT_LIB_LOGD("Calling user's initialization method: %!+i", iterator);
436 iter_status = init_method(iterator, upstream_comp,
437 upstream_port);
438 BT_LOGD("User method returned: status=%s",
439 bt_message_iterator_status_string(iter_status));
440 if (iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
441 BT_LOGW_STR("Initialization method failed.");
442 BT_OBJECT_PUT_REF_AND_RESET(iterator);
443 goto end;
444 }
445 }
446
447 set_self_comp_port_input_msg_iterator_state(iterator,
448 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
449 g_ptr_array_add(port->connection->iterators, iterator);
450 BT_LIB_LOGD("Created message iterator on self component input port: "
451 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
452 upstream_port, upstream_comp, iterator);
453
454 end:
455 return iterator;
456 }
457
458 void *bt_self_message_iterator_get_data(
459 const struct bt_self_message_iterator *self_iterator)
460 {
461 struct bt_self_component_port_input_message_iterator *iterator =
462 (void *) self_iterator;
463
464 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
465 return iterator->user_data;
466 }
467
468 void bt_self_message_iterator_set_data(
469 struct bt_self_message_iterator *self_iterator, void *data)
470 {
471 struct bt_self_component_port_input_message_iterator *iterator =
472 (void *) self_iterator;
473
474 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
475 iterator->user_data = data;
476 BT_LIB_LOGV("Set message iterator's user data: "
477 "%!+i, user-data-addr=%p", iterator, data);
478 }
479
480 BT_ASSERT_PRE_FUNC
481 static inline
482 void bt_message_borrow_packet_stream(const struct bt_message *msg,
483 const struct bt_stream **stream,
484 const struct bt_packet **packet)
485 {
486 BT_ASSERT(msg);
487
488 switch (msg->type) {
489 case BT_MESSAGE_TYPE_EVENT:
490 *packet = bt_event_borrow_packet_const(
491 bt_message_event_borrow_event_const(msg));
492 *stream = bt_packet_borrow_stream_const(*packet);
493 break;
494 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
495 *stream = bt_message_stream_beginning_borrow_stream_const(msg);
496 break;
497 case BT_MESSAGE_TYPE_STREAM_END:
498 *stream = bt_message_stream_end_borrow_stream_const(msg);
499 break;
500 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
501 *packet = bt_message_packet_beginning_borrow_packet_const(msg);
502 *stream = bt_packet_borrow_stream_const(*packet);
503 break;
504 case BT_MESSAGE_TYPE_PACKET_END:
505 *packet = bt_message_packet_end_borrow_packet_const(msg);
506 *stream = bt_packet_borrow_stream_const(*packet);
507 break;
508 default:
509 break;
510 }
511 }
512
513 BT_ASSERT_PRE_FUNC
514 static inline
515 bool validate_message(
516 struct bt_self_component_port_input_message_iterator *iterator,
517 const struct bt_message *c_msg)
518 {
519 bool is_valid = true;
520 struct stream_state *stream_state;
521 const struct bt_stream *stream = NULL;
522 const struct bt_packet *packet = NULL;
523 struct bt_message *msg = (void *) c_msg;
524
525 BT_ASSERT(msg);
526 bt_message_borrow_packet_stream(c_msg, &stream, &packet);
527
528 if (!stream) {
529 /* we don't care about messages not attached to streams */
530 goto end;
531 }
532
533 stream_state = g_hash_table_lookup(iterator->stream_states, stream);
534 if (!stream_state) {
535 /*
536 * No stream state for this stream: this message
537 * MUST be a BT_MESSAGE_TYPE_STREAM_BEGINNING message
538 * and its sequence number must be 0.
539 */
540 if (c_msg->type != BT_MESSAGE_TYPE_STREAM_BEGINNING) {
541 BT_ASSERT_PRE_MSG("Unexpected message: missing a "
542 "BT_MESSAGE_TYPE_STREAM_BEGINNING "
543 "message prior to this message: "
544 "%![stream-]+s", stream);
545 is_valid = false;
546 goto end;
547 }
548
549 if (c_msg->seq_num == -1ULL) {
550 msg->seq_num = 0;
551 }
552
553 if (c_msg->seq_num != 0) {
554 BT_ASSERT_PRE_MSG("Unexpected message sequence "
555 "number for this message iterator: "
556 "this is the first message for this "
557 "stream, expecting sequence number 0: "
558 "seq-num=%" PRIu64 ", %![stream-]+s",
559 c_msg->seq_num, stream);
560 is_valid = false;
561 goto end;
562 }
563
564 stream_state = create_stream_state(stream);
565 if (!stream_state) {
566 abort();
567 }
568
569 g_hash_table_insert(iterator->stream_states,
570 (void *) stream, stream_state);
571 stream_state->expected_msg_seq_num++;
572 goto end;
573 }
574
575 if (stream_state->is_ended) {
576 /*
577 * There's a new message which has a reference to a
578 * stream which, from this iterator's point of view, is
579 * ended ("end of stream" message was returned).
580 * This is bad: the API guarantees that it can never
581 * happen.
582 */
583 BT_ASSERT_PRE_MSG("Stream is already ended: %![stream-]+s",
584 stream);
585 is_valid = false;
586 goto end;
587 }
588
589 if (c_msg->seq_num == -1ULL) {
590 msg->seq_num = stream_state->expected_msg_seq_num;
591 }
592
593 if (c_msg->seq_num != -1ULL &&
594 c_msg->seq_num != stream_state->expected_msg_seq_num) {
595 BT_ASSERT_PRE_MSG("Unexpected message sequence number: "
596 "seq-num=%" PRIu64 ", "
597 "expected-seq-num=%" PRIu64 ", %![stream-]+s",
598 c_msg->seq_num, stream_state->expected_msg_seq_num,
599 stream);
600 is_valid = false;
601 goto end;
602 }
603
604 switch (c_msg->type) {
605 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
606 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_STREAM_BEGINNING "
607 "message at this point: msg-seq-num=%" PRIu64 ", "
608 "%![stream-]+s", c_msg->seq_num, stream);
609 is_valid = false;
610 goto end;
611 case BT_MESSAGE_TYPE_STREAM_END:
612 if (stream_state->cur_packet) {
613 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_STREAM_END "
614 "message: missing a "
615 "BT_MESSAGE_TYPE_PACKET_END message "
616 "prior to this message: "
617 "msg-seq-num=%" PRIu64 ", "
618 "%![stream-]+s", c_msg->seq_num, stream);
619 is_valid = false;
620 goto end;
621 }
622 stream_state->expected_msg_seq_num++;
623 stream_state->is_ended = true;
624 goto end;
625 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
626 if (stream_state->cur_packet) {
627 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_PACKET_BEGINNING "
628 "message at this point: missing a "
629 "BT_MESSAGE_TYPE_PACKET_END message "
630 "prior to this message: "
631 "msg-seq-num=%" PRIu64 ", %![stream-]+s, "
632 "%![packet-]+a", c_msg->seq_num, stream,
633 packet);
634 is_valid = false;
635 goto end;
636 }
637 stream_state->expected_msg_seq_num++;
638 stream_state->cur_packet = packet;
639 bt_object_get_no_null_check(stream_state->cur_packet);
640 goto end;
641 case BT_MESSAGE_TYPE_PACKET_END:
642 if (!stream_state->cur_packet) {
643 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_PACKET_END "
644 "message at this point: missing a "
645 "BT_MESSAGE_TYPE_PACKET_BEGINNING message "
646 "prior to this message: "
647 "msg-seq-num=%" PRIu64 ", %![stream-]+s, "
648 "%![packet-]+a", c_msg->seq_num, stream,
649 packet);
650 is_valid = false;
651 goto end;
652 }
653 stream_state->expected_msg_seq_num++;
654 BT_OBJECT_PUT_REF_AND_RESET(stream_state->cur_packet);
655 goto end;
656 case BT_MESSAGE_TYPE_EVENT:
657 if (packet != stream_state->cur_packet) {
658 BT_ASSERT_PRE_MSG("Unexpected packet for "
659 "BT_MESSAGE_TYPE_EVENT message: "
660 "msg-seq-num=%" PRIu64 ", %![stream-]+s, "
661 "%![msg-packet-]+a, %![expected-packet-]+a",
662 c_msg->seq_num, stream,
663 stream_state->cur_packet, packet);
664 is_valid = false;
665 goto end;
666 }
667 stream_state->expected_msg_seq_num++;
668 goto end;
669 default:
670 break;
671 }
672
673 end:
674 return is_valid;
675 }
676
677 BT_ASSERT_PRE_FUNC
678 static inline
679 bool validate_messages(
680 struct bt_self_component_port_input_message_iterator *iterator,
681 uint64_t count)
682 {
683 bool ret = true;
684 bt_message_array_const msgs =
685 (void *) iterator->base.msgs->pdata;
686 uint64_t i;
687
688 for (i = 0; i < count; i++) {
689 ret = validate_message(iterator, msgs[i]);
690 if (!ret) {
691 break;
692 }
693 }
694
695 return ret;
696 }
697
698 BT_ASSERT_PRE_FUNC
699 static inline bool self_comp_port_input_msg_iter_can_end(
700 struct bt_self_component_port_input_message_iterator *iterator)
701 {
702 GHashTableIter iter;
703 gpointer stream_key, state_value;
704 bool ret = true;
705
706 /*
707 * Verify that this iterator received a
708 * BT_MESSAGE_TYPE_STREAM_END message for each stream
709 * which has a state.
710 */
711
712 g_hash_table_iter_init(&iter, iterator->stream_states);
713
714 while (g_hash_table_iter_next(&iter, &stream_key, &state_value)) {
715 struct stream_state *stream_state = (void *) state_value;
716
717 BT_ASSERT(stream_state);
718 BT_ASSERT(stream_key);
719
720 if (!stream_state->is_ended) {
721 BT_ASSERT_PRE_MSG("Ending message iterator, "
722 "but stream is not ended: "
723 "%![stream-]s", stream_key);
724 ret = false;
725 goto end;
726 }
727 }
728
729 end:
730 return ret;
731 }
732
733 enum bt_message_iterator_status
734 bt_self_component_port_input_message_iterator_next(
735 struct bt_self_component_port_input_message_iterator *iterator,
736 bt_message_array_const *msgs, uint64_t *user_count)
737 {
738 typedef enum bt_self_message_iterator_status (*method_t)(
739 void *, bt_message_array_const, uint64_t, uint64_t *);
740
741 method_t method = NULL;
742 struct bt_component_class *comp_cls;
743 int status = BT_MESSAGE_ITERATOR_STATUS_OK;
744
745 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
746 BT_ASSERT_PRE_NON_NULL(msgs, "Message array (output)");
747 BT_ASSERT_PRE_NON_NULL(user_count, "Message count (output)");
748 BT_ASSERT_PRE(iterator->state ==
749 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE,
750 "Message iterator's \"next\" called, but "
751 "iterator is in the wrong state: %!+i", iterator);
752 BT_ASSERT(iterator->upstream_component);
753 BT_ASSERT(iterator->upstream_component->class);
754 BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured,
755 "Graph is not configured: %!+g",
756 bt_component_borrow_graph(iterator->upstream_component));
757 BT_LIB_LOGD("Getting next self component input port "
758 "message iterator's messages: %!+i", iterator);
759 comp_cls = iterator->upstream_component->class;
760
761 /* Pick the appropriate "next" method */
762 switch (comp_cls->type) {
763 case BT_COMPONENT_CLASS_TYPE_SOURCE:
764 {
765 struct bt_component_class_source *src_comp_cls =
766 (void *) comp_cls;
767
768 method = (method_t) src_comp_cls->methods.msg_iter_next;
769 break;
770 }
771 case BT_COMPONENT_CLASS_TYPE_FILTER:
772 {
773 struct bt_component_class_filter *flt_comp_cls =
774 (void *) comp_cls;
775
776 method = (method_t) flt_comp_cls->methods.msg_iter_next;
777 break;
778 }
779 default:
780 abort();
781 }
782
783 /*
784 * Call the user's "next" method to get the next messages
785 * and status.
786 */
787 BT_ASSERT(method);
788 BT_LOGD_STR("Calling user's \"next\" method.");
789 status = method(iterator, (void *) iterator->base.msgs->pdata,
790 MSG_BATCH_SIZE, user_count);
791 BT_LOGD("User method returned: status=%s",
792 bt_message_iterator_status_string(status));
793 if (status < 0) {
794 BT_LOGW_STR("User method failed.");
795 goto end;
796 }
797
798 #ifdef BT_DEV_MODE
799 /*
800 * There is no way that this iterator could have been finalized
801 * during its "next" method, as the only way to do this is to
802 * put the last iterator's reference, and this can only be done
803 * by its downstream owner.
804 */
805 BT_ASSERT(iterator->state ==
806 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
807 #endif
808
809 switch (status) {
810 case BT_MESSAGE_ITERATOR_STATUS_OK:
811 BT_ASSERT_PRE(validate_messages(iterator, *user_count),
812 "Messages are invalid at this point: "
813 "%![msg-iter-]+i, count=%" PRIu64,
814 iterator, *user_count);
815 *msgs = (void *) iterator->base.msgs->pdata;
816 break;
817 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
818 goto end;
819 case BT_MESSAGE_ITERATOR_STATUS_END:
820 BT_ASSERT_PRE(self_comp_port_input_msg_iter_can_end(iterator),
821 "Message iterator cannot end at this point: "
822 "%!+i", iterator);
823 set_self_comp_port_input_msg_iterator_state(iterator,
824 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED);
825 goto end;
826 default:
827 /* Unknown non-error status */
828 abort();
829 }
830
831 end:
832 return status;
833 }
834
835 enum bt_message_iterator_status bt_port_output_message_iterator_next(
836 struct bt_port_output_message_iterator *iterator,
837 bt_message_array_const *msgs_to_user,
838 uint64_t *count_to_user)
839 {
840 enum bt_message_iterator_status status;
841 enum bt_graph_status graph_status;
842
843 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
844 BT_ASSERT_PRE_NON_NULL(msgs_to_user, "Message array (output)");
845 BT_ASSERT_PRE_NON_NULL(count_to_user, "Message count (output)");
846 BT_LIB_LOGD("Getting next output port message iterator's messages: "
847 "%!+i", iterator);
848
849 /*
850 * As soon as the user calls this function, we mark the graph as
851 * being definitely configured.
852 */
853 bt_graph_set_is_configured(iterator->graph, true);
854
855 graph_status = bt_graph_consume_sink_no_check(iterator->graph,
856 iterator->colander);
857 switch (graph_status) {
858 case BT_GRAPH_STATUS_CANCELED:
859 case BT_GRAPH_STATUS_AGAIN:
860 case BT_GRAPH_STATUS_END:
861 case BT_GRAPH_STATUS_NOMEM:
862 status = (int) graph_status;
863 break;
864 case BT_GRAPH_STATUS_OK:
865 status = BT_MESSAGE_ITERATOR_STATUS_OK;
866
867 /*
868 * On success, the colander sink moves the messages
869 * to this iterator's array and sets this iterator's
870 * message count: move them to the user.
871 */
872 *msgs_to_user = (void *) iterator->base.msgs->pdata;
873 *count_to_user = iterator->count;
874 break;
875 default:
876 /* Other errors */
877 status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
878 }
879
880 return status;
881 }
882
883 struct bt_component *bt_self_component_port_input_message_iterator_borrow_component(
884 struct bt_self_component_port_input_message_iterator *iterator)
885 {
886 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
887 return iterator->upstream_component;
888 }
889
890 struct bt_self_component *bt_self_message_iterator_borrow_component(
891 struct bt_self_message_iterator *self_iterator)
892 {
893 struct bt_self_component_port_input_message_iterator *iterator =
894 (void *) self_iterator;
895
896 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
897 return (void *) iterator->upstream_component;
898 }
899
900 struct bt_self_port_output *bt_self_message_iterator_borrow_port(
901 struct bt_self_message_iterator *self_iterator)
902 {
903 struct bt_self_component_port_input_message_iterator *iterator =
904 (void *) self_iterator;
905
906 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
907 return (void *) iterator->upstream_port;
908 }
909
910 static
911 void bt_port_output_message_iterator_destroy(struct bt_object *obj)
912 {
913 struct bt_port_output_message_iterator *iterator = (void *) obj;
914
915 BT_LIB_LOGD("Destroying output port message iterator object: %!+i",
916 iterator);
917 BT_LOGD_STR("Putting graph.");
918 BT_OBJECT_PUT_REF_AND_RESET(iterator->graph);
919 BT_LOGD_STR("Putting colander sink component.");
920 BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
921 destroy_base_message_iterator(obj);
922 }
923
924 struct bt_port_output_message_iterator *
925 bt_port_output_message_iterator_create(
926 struct bt_graph *graph,
927 const struct bt_port_output *output_port)
928 {
929 struct bt_port_output_message_iterator *iterator = NULL;
930 struct bt_component_class_sink *colander_comp_cls = NULL;
931 struct bt_component *output_port_comp = NULL;
932 struct bt_component_sink *colander_comp;
933 enum bt_graph_status graph_status;
934 struct bt_port_input *colander_in_port = NULL;
935 struct bt_component_class_sink_colander_data colander_data;
936 int ret;
937
938 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
939 BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
940 output_port_comp = bt_port_borrow_component_inline(
941 (const void *) output_port);
942 BT_ASSERT_PRE(output_port_comp,
943 "Output port has no component: %!+p", output_port);
944 BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp) ==
945 (void *) graph,
946 "Output port is not part of graph: %![graph-]+g, %![port-]+p",
947 graph, output_port);
948
949 /* Create message iterator */
950 BT_LIB_LOGD("Creating message iterator on output port: "
951 "%![port-]+p, %![comp-]+c", output_port, output_port_comp);
952 iterator = g_new0(struct bt_port_output_message_iterator, 1);
953 if (!iterator) {
954 BT_LOGE_STR("Failed to allocate one output port message iterator.");
955 goto error;
956 }
957
958 ret = init_message_iterator((void *) iterator,
959 BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT,
960 bt_port_output_message_iterator_destroy);
961 if (ret) {
962 /* init_message_iterator() logs errors */
963 BT_OBJECT_PUT_REF_AND_RESET(iterator);
964 goto end;
965 }
966
967 /* Create colander component */
968 colander_comp_cls = bt_component_class_sink_colander_get();
969 if (!colander_comp_cls) {
970 BT_LOGW("Cannot get colander sink component class.");
971 goto error;
972 }
973
974 iterator->graph = graph;
975 bt_object_get_no_null_check(iterator->graph);
976 colander_data.msgs = (void *) iterator->base.msgs->pdata;
977 colander_data.count_addr = &iterator->count;
978
979 /* Hope that nobody uses this very unique name */
980 graph_status =
981 bt_graph_add_sink_component_with_init_method_data(
982 (void *) graph, colander_comp_cls,
983 "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1",
984 NULL, &colander_data, (void *) &iterator->colander);
985 if (graph_status != BT_GRAPH_STATUS_OK) {
986 BT_LIB_LOGW("Cannot add colander sink component to graph: "
987 "%1[graph-]+g, status=%s", graph,
988 bt_graph_status_string(graph_status));
989 goto error;
990 }
991
992 /*
993 * Connect provided output port to the colander component's
994 * input port.
995 */
996 colander_in_port =
997 (void *) bt_component_sink_borrow_input_port_by_index_const(
998 (void *) iterator->colander, 0);
999 BT_ASSERT(colander_in_port);
1000 graph_status = bt_graph_connect_ports(graph,
1001 output_port, colander_in_port, NULL);
1002 if (graph_status != BT_GRAPH_STATUS_OK) {
1003 BT_LIB_LOGW("Cannot add colander sink component to graph: "
1004 "%![graph-]+g, %![comp-]+c, status=%s", graph,
1005 iterator->colander,
1006 bt_graph_status_string(graph_status));
1007 goto error;
1008 }
1009
1010 /*
1011 * At this point everything went fine. Make the graph
1012 * nonconsumable forever so that only this message iterator
1013 * can consume (thanks to bt_graph_consume_sink_no_check()).
1014 * This avoids leaking the message created by the colander
1015 * sink and moved to the message iterator's message
1016 * member.
1017 */
1018 bt_graph_set_can_consume(iterator->graph, false);
1019 goto end;
1020
1021 error:
1022 if (iterator && iterator->graph && iterator->colander) {
1023 int ret;
1024
1025 /* Remove created colander component from graph if any */
1026 colander_comp = iterator->colander;
1027 BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
1028
1029 /*
1030 * At this point the colander component's reference
1031 * count is 0 because iterator->colander was the only
1032 * owner. We also know that it is not connected because
1033 * this is the last operation before this function
1034 * succeeds.
1035 *
1036 * Since we honor the preconditions here,
1037 * bt_graph_remove_unconnected_component() always
1038 * succeeds.
1039 */
1040 ret = bt_graph_remove_unconnected_component(iterator->graph,
1041 (void *) colander_comp);
1042 BT_ASSERT(ret == 0);
1043 }
1044
1045 BT_OBJECT_PUT_REF_AND_RESET(iterator);
1046
1047 end:
1048 bt_object_put_ref(colander_comp_cls);
1049 return (void *) iterator;
1050 }
1051
1052 void bt_port_output_message_iterator_get_ref(
1053 const struct bt_port_output_message_iterator *iterator)
1054 {
1055 bt_object_get_ref(iterator);
1056 }
1057
1058 void bt_port_output_message_iterator_put_ref(
1059 const struct bt_port_output_message_iterator *iterator)
1060 {
1061 bt_object_put_ref(iterator);
1062 }
1063
1064 void bt_self_component_port_input_message_iterator_get_ref(
1065 const struct bt_self_component_port_input_message_iterator *iterator)
1066 {
1067 bt_object_get_ref(iterator);
1068 }
1069
1070 void bt_self_component_port_input_message_iterator_put_ref(
1071 const struct bt_self_component_port_input_message_iterator *iterator)
1072 {
1073 bt_object_put_ref(iterator);
1074 }
This page took 0.05202 seconds and 4 git commands to generate.