95429fb509b9cfdfcb6940b345951addf6571f76
[babeltrace.git] / lib / graph / iterator.c
1 /*
2 * Copyright 2017-2018 Philippe Proulx <pproulx@efficios.com>
3 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_TAG "MSG-ITER"
25 #include <babeltrace/lib-logging-internal.h>
26
27 #include <babeltrace/compiler-internal.h>
28 #include <babeltrace/trace-ir/field.h>
29 #include <babeltrace/trace-ir/event-const.h>
30 #include <babeltrace/trace-ir/event-internal.h>
31 #include <babeltrace/trace-ir/packet-const.h>
32 #include <babeltrace/trace-ir/packet-internal.h>
33 #include <babeltrace/trace-ir/stream-internal.h>
34 #include <babeltrace/graph/connection-const.h>
35 #include <babeltrace/graph/connection-internal.h>
36 #include <babeltrace/graph/component-const.h>
37 #include <babeltrace/graph/component-internal.h>
38 #include <babeltrace/graph/component-source-internal.h>
39 #include <babeltrace/graph/component-class-internal.h>
40 #include <babeltrace/graph/component-class-sink-colander-internal.h>
41 #include <babeltrace/graph/component-sink-const.h>
42 #include <babeltrace/graph/message-const.h>
43 #include <babeltrace/graph/message-iterator.h>
44 #include <babeltrace/graph/message-iterator-internal.h>
45 #include <babeltrace/graph/self-component-port-input-message-iterator.h>
46 #include <babeltrace/graph/port-output-message-iterator.h>
47 #include <babeltrace/graph/message-internal.h>
48 #include <babeltrace/graph/message-event-const.h>
49 #include <babeltrace/graph/message-event-internal.h>
50 #include <babeltrace/graph/message-packet-const.h>
51 #include <babeltrace/graph/message-packet-internal.h>
52 #include <babeltrace/graph/message-stream-const.h>
53 #include <babeltrace/graph/message-stream-internal.h>
54 #include <babeltrace/graph/port-const.h>
55 #include <babeltrace/graph/graph.h>
56 #include <babeltrace/graph/graph-const.h>
57 #include <babeltrace/graph/graph-internal.h>
58 #include <babeltrace/types.h>
59 #include <babeltrace/assert-internal.h>
60 #include <babeltrace/assert-pre-internal.h>
61 #include <stdint.h>
62 #include <inttypes.h>
63 #include <stdlib.h>
64
65 /*
66 * TODO: Use graph's state (number of active iterators, etc.) and
67 * possibly system specifications to make a better guess than this.
68 */
69 #define MSG_BATCH_SIZE 15
70
71 struct stream_state {
72 const struct bt_stream *stream; /* owned by this */
73 const struct bt_packet *cur_packet; /* owned by this */
74 uint64_t expected_msg_seq_num;
75 bt_bool is_ended;
76 };
77
78 BT_ASSERT_PRE_FUNC
79 static
80 void destroy_stream_state(struct stream_state *stream_state)
81 {
82 if (!stream_state) {
83 return;
84 }
85
86 BT_LOGV("Destroying stream state: stream-state-addr=%p", stream_state);
87 BT_LOGV_STR("Putting stream state's current packet.");
88 BT_OBJECT_PUT_REF_AND_RESET(stream_state->cur_packet);
89 BT_LOGV_STR("Putting stream state's stream.");
90 BT_OBJECT_PUT_REF_AND_RESET(stream_state->stream);
91 g_free(stream_state);
92 }
93
94 BT_ASSERT_PRE_FUNC
95 static
96 struct stream_state *create_stream_state(const struct bt_stream *stream)
97 {
98 struct stream_state *stream_state = g_new0(struct stream_state, 1);
99
100 if (!stream_state) {
101 BT_LOGE_STR("Failed to allocate one stream state.");
102 goto end;
103 }
104
105 /*
106 * We keep a reference to the stream until we know it's ended.
107 */
108 stream_state->stream = stream;
109 bt_object_get_no_null_check(stream_state->stream);
110 BT_LIB_LOGV("Created stream state: %![stream-]+s, "
111 "stream-state-addr=%p",
112 stream, stream_state);
113
114 end:
115 return stream_state;
116 }
117
118 static inline
119 void _set_self_comp_port_input_msg_iterator_state(
120 struct bt_self_component_port_input_message_iterator *iterator,
121 enum bt_self_component_port_input_message_iterator_state state)
122 {
123 BT_ASSERT(iterator);
124 BT_LIB_LOGD("Updating message iterator's state: "
125 "new-state=%s",
126 bt_self_component_port_input_message_iterator_state_string(state));
127 iterator->state = state;
128 }
129
130 #ifdef BT_DEV_MODE
131 # define set_self_comp_port_input_msg_iterator_state _set_self_comp_port_input_msg_iterator_state
132 #else
133 # define set_self_comp_port_input_msg_iterator_state(_a, _b)
134 #endif
135
136 static
137 void destroy_base_message_iterator(struct bt_object *obj)
138 {
139 struct bt_message_iterator *iterator = (void *) obj;
140
141 BT_ASSERT(iterator);
142
143 if (iterator->msgs) {
144 g_ptr_array_free(iterator->msgs, TRUE);
145 iterator->msgs = NULL;
146 }
147
148 g_free(iterator);
149 }
150
151 static
152 void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj)
153 {
154 struct bt_self_component_port_input_message_iterator *iterator;
155
156 BT_ASSERT(obj);
157
158 /*
159 * The message iterator's reference count is 0 if we're
160 * here. Increment it to avoid a double-destroy (possibly
161 * infinitely recursive). This could happen for example if the
162 * message iterator's finalization function does
163 * bt_object_get_ref() (or anything that causes
164 * bt_object_get_ref() to be called) on itself (ref. count goes
165 * from 0 to 1), and then bt_object_put_ref(): the reference
166 * count would go from 1 to 0 again and this function would be
167 * called again.
168 */
169 obj->ref_count++;
170 iterator = (void *) obj;
171 BT_LIB_LOGD("Destroying self component input port message iterator object: "
172 "%!+i", iterator);
173 bt_self_component_port_input_message_iterator_try_finalize(iterator);
174
175 if (iterator->stream_states) {
176 /*
177 * Remove our destroy listener from each stream which
178 * has a state in this iterator. Otherwise the destroy
179 * listener would be called with an invalid/other
180 * message iterator object.
181 */
182 g_hash_table_destroy(iterator->stream_states);
183 iterator->stream_states = NULL;
184 }
185
186 if (iterator->connection) {
187 /*
188 * Remove ourself from the originating connection so
189 * that it does not try to finalize a dangling pointer
190 * later.
191 */
192 bt_connection_remove_iterator(iterator->connection, iterator);
193 iterator->connection = NULL;
194 }
195
196 destroy_base_message_iterator(obj);
197 }
198
199 BT_HIDDEN
200 void bt_self_component_port_input_message_iterator_try_finalize(
201 struct bt_self_component_port_input_message_iterator *iterator)
202 {
203 typedef void (*method_t)(void *);
204
205 struct bt_component_class *comp_class = NULL;
206 method_t method = NULL;
207
208 BT_ASSERT(iterator);
209
210 switch (iterator->state) {
211 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED:
212 /* Skip user finalization if user initialization failed */
213 BT_LIB_LOGD("Not finalizing non-initialized message iterator: "
214 "%!+i", iterator);
215 goto end;
216 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED:
217 /* Already finalized */
218 BT_LIB_LOGD("Not finalizing message iterator: already finalized: "
219 "%!+i", iterator);
220 goto end;
221 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING:
222 /* Already finalized */
223 BT_LIB_LOGF("Message iterator is already being finalized: "
224 "%!+i", iterator);
225 abort();
226 default:
227 break;
228 }
229
230 BT_LIB_LOGD("Finalizing message iterator: %!+i", iterator);
231 set_self_comp_port_input_msg_iterator_state(iterator,
232 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING);
233 BT_ASSERT(iterator->upstream_component);
234 comp_class = iterator->upstream_component->class;
235
236 /* Call user-defined destroy method */
237 switch (comp_class->type) {
238 case BT_COMPONENT_CLASS_TYPE_SOURCE:
239 {
240 struct bt_component_class_source *src_comp_cls =
241 (void *) comp_class;
242
243 method = (method_t) src_comp_cls->methods.msg_iter_finalize;
244 break;
245 }
246 case BT_COMPONENT_CLASS_TYPE_FILTER:
247 {
248 struct bt_component_class_filter *flt_comp_cls =
249 (void *) comp_class;
250
251 method = (method_t) flt_comp_cls->methods.msg_iter_finalize;
252 break;
253 }
254 default:
255 /* Unreachable */
256 abort();
257 }
258
259 if (method) {
260 BT_LIB_LOGD("Calling user's finalization method: %!+i",
261 iterator);
262 method(iterator);
263 }
264
265 iterator->upstream_component = NULL;
266 iterator->upstream_port = NULL;
267 set_self_comp_port_input_msg_iterator_state(iterator,
268 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED);
269 BT_LIB_LOGD("Finalized message iterator: %!+i", iterator);
270
271 end:
272 return;
273 }
274
275 BT_HIDDEN
276 void bt_self_component_port_input_message_iterator_set_connection(
277 struct bt_self_component_port_input_message_iterator *iterator,
278 struct bt_connection *connection)
279 {
280 BT_ASSERT(iterator);
281 iterator->connection = connection;
282 BT_LIB_LOGV("Set message iterator's connection: "
283 "%![iter-]+i, %![conn-]+x", iterator, connection);
284 }
285
286 static
287 int init_message_iterator(struct bt_message_iterator *iterator,
288 enum bt_message_iterator_type type,
289 bt_object_release_func destroy)
290 {
291 int ret = 0;
292
293 bt_object_init_shared(&iterator->base, destroy);
294 iterator->type = type;
295 iterator->msgs = g_ptr_array_new();
296 if (!iterator->msgs) {
297 BT_LOGE_STR("Failed to allocate a GPtrArray.");
298 ret = -1;
299 goto end;
300 }
301
302 g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE);
303
304 end:
305 return ret;
306 }
307
308 static
309 struct bt_self_component_port_input_message_iterator *
310 bt_self_component_port_input_message_iterator_create_initial(
311 struct bt_component *upstream_comp,
312 struct bt_port *upstream_port)
313 {
314 int ret;
315 struct bt_self_component_port_input_message_iterator *iterator = NULL;
316
317 BT_ASSERT(upstream_comp);
318 BT_ASSERT(upstream_port);
319 BT_ASSERT(bt_port_is_connected(upstream_port));
320 BT_LIB_LOGD("Creating initial message iterator on self component input port: "
321 "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port);
322 BT_ASSERT(bt_component_get_class_type(upstream_comp) ==
323 BT_COMPONENT_CLASS_TYPE_SOURCE ||
324 bt_component_get_class_type(upstream_comp) ==
325 BT_COMPONENT_CLASS_TYPE_FILTER);
326 iterator = g_new0(
327 struct bt_self_component_port_input_message_iterator, 1);
328 if (!iterator) {
329 BT_LOGE_STR("Failed to allocate one self component input port "
330 "message iterator.");
331 goto end;
332 }
333
334 ret = init_message_iterator((void *) iterator,
335 BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT,
336 bt_self_component_port_input_message_iterator_destroy);
337 if (ret) {
338 /* init_message_iterator() logs errors */
339 BT_OBJECT_PUT_REF_AND_RESET(iterator);
340 goto end;
341 }
342
343 iterator->stream_states = g_hash_table_new_full(g_direct_hash,
344 g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
345 if (!iterator->stream_states) {
346 BT_LOGE_STR("Failed to allocate a GHashTable.");
347 BT_OBJECT_PUT_REF_AND_RESET(iterator);
348 goto end;
349 }
350
351 iterator->upstream_component = upstream_comp;
352 iterator->upstream_port = upstream_port;
353 iterator->connection = iterator->upstream_port->connection;
354 iterator->graph = bt_component_borrow_graph(upstream_comp);
355 set_self_comp_port_input_msg_iterator_state(iterator,
356 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED);
357 BT_LIB_LOGD("Created initial message iterator on self component input port: "
358 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
359 upstream_port, upstream_comp, iterator);
360
361 end:
362 return iterator;
363 }
364
365 struct bt_self_component_port_input_message_iterator *
366 bt_self_component_port_input_message_iterator_create(
367 struct bt_self_component_port_input *self_port)
368 {
369 typedef enum bt_self_message_iterator_status (*init_method_t)(
370 void *, void *, void *);
371
372 init_method_t init_method = NULL;
373 struct bt_self_component_port_input_message_iterator *iterator =
374 NULL;
375 struct bt_port *port = (void *) self_port;
376 struct bt_port *upstream_port;
377 struct bt_component *comp;
378 struct bt_component *upstream_comp;
379 struct bt_component_class *upstream_comp_cls;
380
381 BT_ASSERT_PRE_NON_NULL(port, "Port");
382 comp = bt_port_borrow_component_inline(port);
383 BT_ASSERT_PRE(bt_port_is_connected(port),
384 "Port is not connected: %![port-]+p", port);
385 BT_ASSERT_PRE(comp, "Port is not part of a component: %![port-]+p",
386 port);
387 BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp),
388 "Port's component's graph is canceled: "
389 "%![port-]+p, %![comp-]+c", port, comp);
390 BT_ASSERT(port->connection);
391 upstream_port = port->connection->upstream_port;
392 BT_ASSERT(upstream_port);
393 upstream_comp = bt_port_borrow_component_inline(upstream_port);
394 BT_ASSERT(upstream_comp);
395 upstream_comp_cls = upstream_comp->class;
396 BT_ASSERT(upstream_comp->class->type ==
397 BT_COMPONENT_CLASS_TYPE_SOURCE ||
398 upstream_comp->class->type ==
399 BT_COMPONENT_CLASS_TYPE_FILTER);
400 iterator = bt_self_component_port_input_message_iterator_create_initial(
401 upstream_comp, upstream_port);
402 if (!iterator) {
403 BT_LOGW_STR("Cannot create self component input port "
404 "message iterator.");
405 goto end;
406 }
407
408 switch (upstream_comp_cls->type) {
409 case BT_COMPONENT_CLASS_TYPE_SOURCE:
410 {
411 struct bt_component_class_source *src_comp_cls =
412 (void *) upstream_comp_cls;
413
414 init_method =
415 (init_method_t) src_comp_cls->methods.msg_iter_init;
416 break;
417 }
418 case BT_COMPONENT_CLASS_TYPE_FILTER:
419 {
420 struct bt_component_class_filter *flt_comp_cls =
421 (void *) upstream_comp_cls;
422
423 init_method =
424 (init_method_t) flt_comp_cls->methods.msg_iter_init;
425 break;
426 }
427 default:
428 /* Unreachable */
429 abort();
430 }
431
432 if (init_method) {
433 int iter_status;
434
435 BT_LIB_LOGD("Calling user's initialization method: %!+i", iterator);
436 iter_status = init_method(iterator, upstream_comp,
437 upstream_port);
438 BT_LOGD("User method returned: status=%s",
439 bt_message_iterator_status_string(iter_status));
440 if (iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
441 BT_LOGW_STR("Initialization method failed.");
442 BT_OBJECT_PUT_REF_AND_RESET(iterator);
443 goto end;
444 }
445 }
446
447 set_self_comp_port_input_msg_iterator_state(iterator,
448 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
449 g_ptr_array_add(port->connection->iterators, iterator);
450 BT_LIB_LOGD("Created message iterator on self component input port: "
451 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
452 upstream_port, upstream_comp, iterator);
453
454 end:
455 return iterator;
456 }
457
458 void *bt_self_message_iterator_get_data(
459 const struct bt_self_message_iterator *self_iterator)
460 {
461 struct bt_self_component_port_input_message_iterator *iterator =
462 (void *) self_iterator;
463
464 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
465 return iterator->user_data;
466 }
467
468 void bt_self_message_iterator_set_data(
469 struct bt_self_message_iterator *self_iterator, void *data)
470 {
471 struct bt_self_component_port_input_message_iterator *iterator =
472 (void *) self_iterator;
473
474 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
475 iterator->user_data = data;
476 BT_LIB_LOGV("Set message iterator's user data: "
477 "%!+i, user-data-addr=%p", iterator, data);
478 }
479
480 BT_ASSERT_PRE_FUNC
481 static inline
482 void bt_message_borrow_packet_stream(const struct bt_message *msg,
483 const struct bt_stream **stream,
484 const struct bt_packet **packet)
485 {
486 BT_ASSERT(msg);
487
488 switch (msg->type) {
489 case BT_MESSAGE_TYPE_EVENT:
490 *packet = bt_event_borrow_packet_const(
491 bt_message_event_borrow_event_const(msg));
492 *stream = bt_packet_borrow_stream_const(*packet);
493 break;
494 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
495 *stream = bt_message_stream_beginning_borrow_stream_const(msg);
496 break;
497 case BT_MESSAGE_TYPE_STREAM_END:
498 *stream = bt_message_stream_end_borrow_stream_const(msg);
499 break;
500 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
501 *packet = bt_message_packet_beginning_borrow_packet_const(msg);
502 *stream = bt_packet_borrow_stream_const(*packet);
503 break;
504 case BT_MESSAGE_TYPE_PACKET_END:
505 *packet = bt_message_packet_end_borrow_packet_const(msg);
506 *stream = bt_packet_borrow_stream_const(*packet);
507 break;
508 default:
509 break;
510 }
511 }
512
513 BT_ASSERT_PRE_FUNC
514 static inline
515 bool validate_message(
516 struct bt_self_component_port_input_message_iterator *iterator,
517 const struct bt_message *c_msg)
518 {
519 bool is_valid = true;
520 struct stream_state *stream_state;
521 const struct bt_stream *stream = NULL;
522 const struct bt_packet *packet = NULL;
523 struct bt_message *msg = (void *) c_msg;
524
525 BT_ASSERT(msg);
526 bt_message_borrow_packet_stream(c_msg, &stream, &packet);
527
528 if (!stream) {
529 /* we don't care about messages not attached to streams */
530 goto end;
531 }
532
533 stream_state = g_hash_table_lookup(iterator->stream_states, stream);
534 if (!stream_state) {
535 /*
536 * No stream state for this stream: this message
537 * MUST be a BT_MESSAGE_TYPE_STREAM_BEGINNING message
538 * and its sequence number must be 0.
539 */
540 if (c_msg->type != BT_MESSAGE_TYPE_STREAM_BEGINNING) {
541 BT_ASSERT_PRE_MSG("Unexpected message: missing a "
542 "BT_MESSAGE_TYPE_STREAM_BEGINNING "
543 "message prior to this message: "
544 "%![stream-]+s", stream);
545 is_valid = false;
546 goto end;
547 }
548
549 if (c_msg->seq_num == -1ULL) {
550 msg->seq_num = 0;
551 }
552
553 if (c_msg->seq_num != 0) {
554 BT_ASSERT_PRE_MSG("Unexpected message sequence "
555 "number for this message iterator: "
556 "this is the first message for this "
557 "stream, expecting sequence number 0: "
558 "seq-num=%" PRIu64 ", %![stream-]+s",
559 c_msg->seq_num, stream);
560 is_valid = false;
561 goto end;
562 }
563
564 stream_state = create_stream_state(stream);
565 if (!stream_state) {
566 abort();
567 }
568
569 g_hash_table_insert(iterator->stream_states,
570 (void *) stream, stream_state);
571 stream_state->expected_msg_seq_num++;
572 goto end;
573 }
574
575 if (stream_state->is_ended) {
576 /*
577 * There's a new message which has a reference to a
578 * stream which, from this iterator's point of view, is
579 * ended ("end of stream" message was returned).
580 * This is bad: the API guarantees that it can never
581 * happen.
582 */
583 BT_ASSERT_PRE_MSG("Stream is already ended: %![stream-]+s",
584 stream);
585 is_valid = false;
586 goto end;
587 }
588
589 if (c_msg->seq_num == -1ULL) {
590 msg->seq_num = stream_state->expected_msg_seq_num;
591 }
592
593 if (c_msg->seq_num != -1ULL &&
594 c_msg->seq_num != stream_state->expected_msg_seq_num) {
595 BT_ASSERT_PRE_MSG("Unexpected message sequence number: "
596 "seq-num=%" PRIu64 ", "
597 "expected-seq-num=%" PRIu64 ", %![stream-]+s",
598 c_msg->seq_num, stream_state->expected_msg_seq_num,
599 stream);
600 is_valid = false;
601 goto end;
602 }
603
604 switch (c_msg->type) {
605 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
606 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_STREAM_BEGINNING "
607 "message at this point: msg-seq-num=%" PRIu64 ", "
608 "%![stream-]+s", c_msg->seq_num, stream);
609 is_valid = false;
610 goto end;
611 case BT_MESSAGE_TYPE_STREAM_END:
612 if (stream_state->cur_packet) {
613 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_STREAM_END "
614 "message: missing a "
615 "BT_MESSAGE_TYPE_PACKET_END message "
616 "prior to this message: "
617 "msg-seq-num=%" PRIu64 ", "
618 "%![stream-]+s", c_msg->seq_num, stream);
619 is_valid = false;
620 goto end;
621 }
622 stream_state->expected_msg_seq_num++;
623 stream_state->is_ended = true;
624 goto end;
625 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
626 if (stream_state->cur_packet) {
627 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_PACKET_BEGINNING "
628 "message at this point: missing a "
629 "BT_MESSAGE_TYPE_PACKET_END message "
630 "prior to this message: "
631 "msg-seq-num=%" PRIu64 ", %![stream-]+s, "
632 "%![packet-]+a", c_msg->seq_num, stream,
633 packet);
634 is_valid = false;
635 goto end;
636 }
637 stream_state->expected_msg_seq_num++;
638 stream_state->cur_packet = packet;
639 bt_object_get_no_null_check(stream_state->cur_packet);
640 goto end;
641 case BT_MESSAGE_TYPE_PACKET_END:
642 if (!stream_state->cur_packet) {
643 BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_PACKET_END "
644 "message at this point: missing a "
645 "BT_MESSAGE_TYPE_PACKET_BEGINNING message "
646 "prior to this message: "
647 "msg-seq-num=%" PRIu64 ", %![stream-]+s, "
648 "%![packet-]+a", c_msg->seq_num, stream,
649 packet);
650 is_valid = false;
651 goto end;
652 }
653 stream_state->expected_msg_seq_num++;
654 BT_OBJECT_PUT_REF_AND_RESET(stream_state->cur_packet);
655 goto end;
656 case BT_MESSAGE_TYPE_EVENT:
657 if (packet != stream_state->cur_packet) {
658 BT_ASSERT_PRE_MSG("Unexpected packet for "
659 "BT_MESSAGE_TYPE_EVENT message: "
660 "msg-seq-num=%" PRIu64 ", %![stream-]+s, "
661 "%![msg-packet-]+a, %![expected-packet-]+a",
662 c_msg->seq_num, stream,
663 stream_state->cur_packet, packet);
664 is_valid = false;
665 goto end;
666 }
667 stream_state->expected_msg_seq_num++;
668 goto end;
669 default:
670 break;
671 }
672
673 end:
674 return is_valid;
675 }
676
677 BT_ASSERT_PRE_FUNC
678 static inline
679 bool validate_messages(
680 struct bt_self_component_port_input_message_iterator *iterator,
681 uint64_t count)
682 {
683 bool ret = true;
684 bt_message_array_const msgs =
685 (void *) iterator->base.msgs->pdata;
686 uint64_t i;
687
688 for (i = 0; i < count; i++) {
689 ret = validate_message(iterator, msgs[i]);
690 if (!ret) {
691 break;
692 }
693 }
694
695 return ret;
696 }
697
698 BT_ASSERT_PRE_FUNC
699 static inline bool self_comp_port_input_msg_iter_can_end(
700 struct bt_self_component_port_input_message_iterator *iterator)
701 {
702 GHashTableIter iter;
703 gpointer stream_key, state_value;
704 bool ret = true;
705
706 /*
707 * Verify that this iterator received a
708 * BT_MESSAGE_TYPE_STREAM_END message for each stream
709 * which has a state.
710 */
711
712 g_hash_table_iter_init(&iter, iterator->stream_states);
713
714 while (g_hash_table_iter_next(&iter, &stream_key, &state_value)) {
715 struct stream_state *stream_state = (void *) state_value;
716
717 BT_ASSERT(stream_state);
718 BT_ASSERT(stream_key);
719
720 if (!stream_state->is_ended) {
721 BT_ASSERT_PRE_MSG("Ending message iterator, "
722 "but stream is not ended: "
723 "%![stream-]s", stream_key);
724 ret = false;
725 goto end;
726 }
727 }
728
729 end:
730 return ret;
731 }
732
733 enum bt_message_iterator_status
734 bt_self_component_port_input_message_iterator_next(
735 struct bt_self_component_port_input_message_iterator *iterator,
736 bt_message_array_const *msgs, uint64_t *user_count)
737 {
738 typedef enum bt_self_message_iterator_status (*method_t)(
739 void *, bt_message_array_const, uint64_t, uint64_t *);
740
741 method_t method = NULL;
742 struct bt_component_class *comp_cls;
743 int status = BT_MESSAGE_ITERATOR_STATUS_OK;
744
745 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
746 BT_ASSERT_PRE_NON_NULL(msgs, "Message array (output)");
747 BT_ASSERT_PRE_NON_NULL(user_count, "Message count (output)");
748 BT_ASSERT_PRE(iterator->state ==
749 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE,
750 "Message iterator's \"next\" called, but "
751 "iterator is in the wrong state: %!+i", iterator);
752 BT_ASSERT(iterator->upstream_component);
753 BT_ASSERT(iterator->upstream_component->class);
754 BT_LIB_LOGD("Getting next self component input port "
755 "message iterator's messages: %!+i", iterator);
756 comp_cls = iterator->upstream_component->class;
757
758 /* Pick the appropriate "next" method */
759 switch (comp_cls->type) {
760 case BT_COMPONENT_CLASS_TYPE_SOURCE:
761 {
762 struct bt_component_class_source *src_comp_cls =
763 (void *) comp_cls;
764
765 method = (method_t) src_comp_cls->methods.msg_iter_next;
766 break;
767 }
768 case BT_COMPONENT_CLASS_TYPE_FILTER:
769 {
770 struct bt_component_class_filter *flt_comp_cls =
771 (void *) comp_cls;
772
773 method = (method_t) flt_comp_cls->methods.msg_iter_next;
774 break;
775 }
776 default:
777 abort();
778 }
779
780 /*
781 * Call the user's "next" method to get the next messages
782 * and status.
783 */
784 BT_ASSERT(method);
785 BT_LOGD_STR("Calling user's \"next\" method.");
786 status = method(iterator, (void *) iterator->base.msgs->pdata,
787 MSG_BATCH_SIZE, user_count);
788 BT_LOGD("User method returned: status=%s",
789 bt_message_iterator_status_string(status));
790 if (status < 0) {
791 BT_LOGW_STR("User method failed.");
792 goto end;
793 }
794
795 #ifdef BT_DEV_MODE
796 /*
797 * There is no way that this iterator could have been finalized
798 * during its "next" method, as the only way to do this is to
799 * put the last iterator's reference, and this can only be done
800 * by its downstream owner.
801 */
802 BT_ASSERT(iterator->state ==
803 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
804 #endif
805
806 switch (status) {
807 case BT_MESSAGE_ITERATOR_STATUS_OK:
808 BT_ASSERT_PRE(validate_messages(iterator, *user_count),
809 "Messages are invalid at this point: "
810 "%![msg-iter-]+i, count=%" PRIu64,
811 iterator, *user_count);
812 *msgs = (void *) iterator->base.msgs->pdata;
813 break;
814 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
815 goto end;
816 case BT_MESSAGE_ITERATOR_STATUS_END:
817 BT_ASSERT_PRE(self_comp_port_input_msg_iter_can_end(iterator),
818 "Message iterator cannot end at this point: "
819 "%!+i", iterator);
820 set_self_comp_port_input_msg_iterator_state(iterator,
821 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED);
822 goto end;
823 default:
824 /* Unknown non-error status */
825 abort();
826 }
827
828 end:
829 return status;
830 }
831
832 enum bt_message_iterator_status bt_port_output_message_iterator_next(
833 struct bt_port_output_message_iterator *iterator,
834 bt_message_array_const *msgs_to_user,
835 uint64_t *count_to_user)
836 {
837 enum bt_message_iterator_status status;
838 enum bt_graph_status graph_status;
839
840 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
841 BT_ASSERT_PRE_NON_NULL(msgs_to_user, "Message array (output)");
842 BT_ASSERT_PRE_NON_NULL(count_to_user, "Message count (output)");
843 BT_LIB_LOGD("Getting next output port message iterator's messages: "
844 "%!+i", iterator);
845
846 graph_status = bt_graph_consume_sink_no_check(iterator->graph,
847 iterator->colander);
848 switch (graph_status) {
849 case BT_GRAPH_STATUS_CANCELED:
850 case BT_GRAPH_STATUS_AGAIN:
851 case BT_GRAPH_STATUS_END:
852 case BT_GRAPH_STATUS_NOMEM:
853 status = (int) graph_status;
854 break;
855 case BT_GRAPH_STATUS_OK:
856 status = BT_MESSAGE_ITERATOR_STATUS_OK;
857
858 /*
859 * On success, the colander sink moves the messages
860 * to this iterator's array and sets this iterator's
861 * message count: move them to the user.
862 */
863 *msgs_to_user = (void *) iterator->base.msgs->pdata;
864 *count_to_user = iterator->count;
865 break;
866 default:
867 /* Other errors */
868 status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
869 }
870
871 return status;
872 }
873
874 struct bt_component *bt_self_component_port_input_message_iterator_borrow_component(
875 struct bt_self_component_port_input_message_iterator *iterator)
876 {
877 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
878 return iterator->upstream_component;
879 }
880
881 struct bt_self_component *bt_self_message_iterator_borrow_component(
882 struct bt_self_message_iterator *self_iterator)
883 {
884 struct bt_self_component_port_input_message_iterator *iterator =
885 (void *) self_iterator;
886
887 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
888 return (void *) iterator->upstream_component;
889 }
890
891 struct bt_self_port_output *bt_self_message_iterator_borrow_port(
892 struct bt_self_message_iterator *self_iterator)
893 {
894 struct bt_self_component_port_input_message_iterator *iterator =
895 (void *) self_iterator;
896
897 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
898 return (void *) iterator->upstream_port;
899 }
900
901 static
902 void bt_port_output_message_iterator_destroy(struct bt_object *obj)
903 {
904 struct bt_port_output_message_iterator *iterator = (void *) obj;
905
906 BT_LIB_LOGD("Destroying output port message iterator object: %!+i",
907 iterator);
908 BT_LOGD_STR("Putting graph.");
909 BT_OBJECT_PUT_REF_AND_RESET(iterator->graph);
910 BT_LOGD_STR("Putting colander sink component.");
911 BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
912 destroy_base_message_iterator(obj);
913 }
914
915 struct bt_port_output_message_iterator *
916 bt_port_output_message_iterator_create(
917 struct bt_graph *graph,
918 const struct bt_port_output *output_port)
919 {
920 struct bt_port_output_message_iterator *iterator = NULL;
921 struct bt_component_class_sink *colander_comp_cls = NULL;
922 struct bt_component *output_port_comp = NULL;
923 struct bt_component_sink *colander_comp;
924 enum bt_graph_status graph_status;
925 struct bt_port_input *colander_in_port = NULL;
926 struct bt_component_class_sink_colander_data colander_data;
927 int ret;
928
929 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
930 BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
931 output_port_comp = bt_port_borrow_component_inline(
932 (const void *) output_port);
933 BT_ASSERT_PRE(output_port_comp,
934 "Output port has no component: %!+p", output_port);
935 BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp) ==
936 (void *) graph,
937 "Output port is not part of graph: %![graph-]+g, %![port-]+p",
938 graph, output_port);
939
940 /* Create message iterator */
941 BT_LIB_LOGD("Creating message iterator on output port: "
942 "%![port-]+p, %![comp-]+c", output_port, output_port_comp);
943 iterator = g_new0(struct bt_port_output_message_iterator, 1);
944 if (!iterator) {
945 BT_LOGE_STR("Failed to allocate one output port message iterator.");
946 goto error;
947 }
948
949 ret = init_message_iterator((void *) iterator,
950 BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT,
951 bt_port_output_message_iterator_destroy);
952 if (ret) {
953 /* init_message_iterator() logs errors */
954 BT_OBJECT_PUT_REF_AND_RESET(iterator);
955 goto end;
956 }
957
958 /* Create colander component */
959 colander_comp_cls = bt_component_class_sink_colander_get();
960 if (!colander_comp_cls) {
961 BT_LOGW("Cannot get colander sink component class.");
962 goto error;
963 }
964
965 iterator->graph = graph;
966 bt_object_get_no_null_check(iterator->graph);
967 colander_data.msgs = (void *) iterator->base.msgs->pdata;
968 colander_data.count_addr = &iterator->count;
969
970 /* Hope that nobody uses this very unique name */
971 graph_status =
972 bt_graph_add_sink_component_with_init_method_data(
973 (void *) graph, colander_comp_cls,
974 "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1",
975 NULL, &colander_data, (void *) &iterator->colander);
976 if (graph_status != BT_GRAPH_STATUS_OK) {
977 BT_LIB_LOGW("Cannot add colander sink component to graph: "
978 "%1[graph-]+g, status=%s", graph,
979 bt_graph_status_string(graph_status));
980 goto error;
981 }
982
983 /*
984 * Connect provided output port to the colander component's
985 * input port.
986 */
987 colander_in_port =
988 (void *) bt_component_sink_borrow_input_port_by_index_const(
989 (void *) iterator->colander, 0);
990 BT_ASSERT(colander_in_port);
991 graph_status = bt_graph_connect_ports(graph,
992 output_port, colander_in_port, NULL);
993 if (graph_status != BT_GRAPH_STATUS_OK) {
994 BT_LIB_LOGW("Cannot add colander sink component to graph: "
995 "%![graph-]+g, %![comp-]+c, status=%s", graph,
996 iterator->colander,
997 bt_graph_status_string(graph_status));
998 goto error;
999 }
1000
1001 /*
1002 * At this point everything went fine. Make the graph
1003 * nonconsumable forever so that only this message iterator
1004 * can consume (thanks to bt_graph_consume_sink_no_check()).
1005 * This avoids leaking the message created by the colander
1006 * sink and moved to the message iterator's message
1007 * member.
1008 */
1009 bt_graph_set_can_consume(iterator->graph, false);
1010 goto end;
1011
1012 error:
1013 if (iterator && iterator->graph && iterator->colander) {
1014 int ret;
1015
1016 /* Remove created colander component from graph if any */
1017 colander_comp = iterator->colander;
1018 BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
1019
1020 /*
1021 * At this point the colander component's reference
1022 * count is 0 because iterator->colander was the only
1023 * owner. We also know that it is not connected because
1024 * this is the last operation before this function
1025 * succeeds.
1026 *
1027 * Since we honor the preconditions here,
1028 * bt_graph_remove_unconnected_component() always
1029 * succeeds.
1030 */
1031 ret = bt_graph_remove_unconnected_component(iterator->graph,
1032 (void *) colander_comp);
1033 BT_ASSERT(ret == 0);
1034 }
1035
1036 BT_OBJECT_PUT_REF_AND_RESET(iterator);
1037
1038 end:
1039 bt_object_put_ref(colander_comp_cls);
1040 return (void *) iterator;
1041 }
1042
1043 void bt_port_output_message_iterator_get_ref(
1044 const struct bt_port_output_message_iterator *iterator)
1045 {
1046 bt_object_get_ref(iterator);
1047 }
1048
1049 void bt_port_output_message_iterator_put_ref(
1050 const struct bt_port_output_message_iterator *iterator)
1051 {
1052 bt_object_put_ref(iterator);
1053 }
1054
1055 void bt_self_component_port_input_message_iterator_get_ref(
1056 const struct bt_self_component_port_input_message_iterator *iterator)
1057 {
1058 bt_object_get_ref(iterator);
1059 }
1060
1061 void bt_self_component_port_input_message_iterator_put_ref(
1062 const struct bt_self_component_port_input_message_iterator *iterator)
1063 {
1064 bt_object_put_ref(iterator);
1065 }
This page took 0.049892 seconds and 3 git commands to generate.