From: Philippe Proulx Date: Sun, 4 Aug 2019 14:04:03 +0000 (-0400) Subject: lib: remove output port message iterator X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=6c373cc905e907ecbad698fee38db1d47a981b14 lib: remove output port message iterator The output port message iterator concept was not a good idea in the beginning: it breaks the graph model, making it possible to iterate a disconnected output port (which is weird from the component's perspective), blocking the graph (user cannot call bt_graph_run()), and forcing the graph not to contain any sink component. This looks and feels like a hack. For the C part, a subsequent patch should implement an easy way to add a simple sink component to a graph based on a simple callback and custom user data instead of going through the inconvenience of creating a sink component class, setting the methods manually, creating an input port message iterator once the graph is configured, etc. For the Python part, we'll focus on `TraceCollectionMessageIterator` to replace `_OutputPortMessageIterator`. `TraceCollectionMessageIterator` should cover most of the use cases and is easier to use: you don't need to set up your graph, add your own `flt.utils.muxer`, etc. For more advanced use cases in Python, it's always possible to create a "proxy sink component", just like `TraceCollectionMessageIterator` does internally, to get full control on the input port message iterator. To adapt the current tests, `TestOutputPortMessageIterator` in `utils.py` can be used like an output port message iterator. Such an iterator cannot seek however, so `test_message_iterator.py` needed special treatments to make the eventual input port message iterator seek. Signed-off-by: Philippe Proulx Change-Id: I92d432fb33d35ae3c0262b723cdfeae82c6633c9 Reviewed-on: https://review.lttng.org/c/babeltrace/+/1821 Tested-by: jenkins --- diff --git a/include/Makefile.am b/include/Makefile.am index bddc8a5d..f2ff631b 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -111,7 +111,6 @@ babeltrace2graphinclude_HEADERS = \ babeltrace2/graph/port-const.h \ babeltrace2/graph/port-input-const.h \ babeltrace2/graph/port-output-const.h \ - babeltrace2/graph/port-output-message-iterator.h \ babeltrace2/graph/private-query-executor.h \ babeltrace2/graph/query-executor-const.h \ babeltrace2/graph/query-executor.h \ diff --git a/include/babeltrace2/babeltrace.h b/include/babeltrace2/babeltrace.h index edd45d76..92599faa 100644 --- a/include/babeltrace2/babeltrace.h +++ b/include/babeltrace2/babeltrace.h @@ -111,7 +111,6 @@ /* Message iterator API */ #include -#include #include #include diff --git a/include/babeltrace2/graph/port-output-message-iterator.h b/include/babeltrace2/graph/port-output-message-iterator.h deleted file mode 100644 index 49d564a7..00000000 --- a/include/babeltrace2/graph/port-output-message-iterator.h +++ /dev/null @@ -1,96 +0,0 @@ -#ifndef BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H -#define BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H - -/* - * Copyright (c) 2010-2019 EfficiOS Inc. and Linux Foundation - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -#ifndef __BT_IN_BABELTRACE_H -# error "Please include instead." -#endif - -#include - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -static inline -bt_message_iterator * -bt_port_output_message_iterator_as_message_iterator( - bt_port_output_message_iterator *iterator) -{ - return __BT_UPCAST(bt_message_iterator, iterator); -} - -extern bt_port_output_message_iterator * -bt_port_output_message_iterator_create( - bt_graph *graph, - const bt_port_output *output_port); - -extern bt_message_iterator_next_status -bt_port_output_message_iterator_next( - bt_port_output_message_iterator *iterator, - bt_message_array_const *msgs, uint64_t *count); - -extern bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin( - bt_port_output_message_iterator *iterator, - int64_t ns_from_origin); - -extern bt_bool bt_port_output_message_iterator_can_seek_beginning( - bt_port_output_message_iterator *iterator); - -extern bt_message_iterator_seek_ns_from_origin_status -bt_port_output_message_iterator_seek_ns_from_origin( - bt_port_output_message_iterator *iterator, - int64_t ns_from_origin); - -extern bt_message_iterator_seek_beginning_status -bt_port_output_message_iterator_seek_beginning( - bt_port_output_message_iterator *iterator); - -extern void bt_port_output_message_iterator_get_ref( - const bt_port_output_message_iterator *port_output_message_iterator); - -extern void bt_port_output_message_iterator_put_ref( - const bt_port_output_message_iterator *port_output_message_iterator); - -#define BT_PORT_OUTPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(_var) \ - do { \ - bt_port_output_message_iterator_put_ref(_var); \ - (_var) = NULL; \ - } while (0) - -#define BT_PORT_OUTPUT_MESSAGE_ITERATOR_MOVE_REF(_var_dst, _var_src) \ - do { \ - bt_port_output_message_iterator_put_ref(_var_dst); \ - (_var_dst) = (_var_src); \ - (_var_src) = NULL; \ - } while (0) - -#ifdef __cplusplus -} -#endif - -#endif /* BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H */ diff --git a/src/bindings/python/bt2/bt2/graph.py b/src/bindings/python/bt2/bt2/graph.py index 6c75d52b..3812788f 100644 --- a/src/bindings/python/bt2/bt2/graph.py +++ b/src/bindings/python/bt2/bt2/graph.py @@ -24,7 +24,6 @@ from bt2 import native_bt, object, utils from bt2 import interrupter as bt2_interrupter from bt2 import connection as bt2_connection from bt2 import component as bt2_component -from bt2 import message_iterator as bt2_message_iterator import functools from bt2 import port as bt2_port from bt2 import logging as bt2_logging @@ -196,14 +195,3 @@ class Graph(object._SharedObject): def interrupt(self): native_bt.graph_interrupt(self._ptr) - - def create_output_port_message_iterator(self, output_port): - utils._check_type(output_port, bt2_port._OutputPort) - msg_iter_ptr = native_bt.port_output_message_iterator_create( - self._ptr, output_port._ptr - ) - - if msg_iter_ptr is None: - raise bt2._MemoryError('cannot create output port message iterator') - - return bt2_message_iterator._OutputPortMessageIterator(msg_iter_ptr) diff --git a/src/bindings/python/bt2/bt2/message_iterator.py b/src/bindings/python/bt2/bt2/message_iterator.py index c6c23dfd..1a48b811 100644 --- a/src/bindings/python/bt2/bt2/message_iterator.py +++ b/src/bindings/python/bt2/bt2/message_iterator.py @@ -88,20 +88,6 @@ class _UserComponentInputPortMessageIterator(_GenericMessageIterator): ) -# This is created when the user wants to iterate on a component's output port, -# from outside the graph. -class _OutputPortMessageIterator(_GenericMessageIterator): - _get_msg_range = staticmethod(native_bt.bt2_port_output_get_msg_range) - _get_ref = staticmethod(native_bt.port_output_message_iterator_get_ref) - _put_ref = staticmethod(native_bt.port_output_message_iterator_put_ref) - _can_seek_beginning = staticmethod( - native_bt.port_output_message_iterator_can_seek_beginning - ) - _seek_beginning = staticmethod( - native_bt.port_output_message_iterator_seek_beginning - ) - - # This is extended by the user to implement component classes in Python. It # is created for a given output port when an input port message iterator is # created on the input port on the other side of the connection. It is also diff --git a/src/bindings/python/bt2/bt2/native_bt_message_iterator.i b/src/bindings/python/bt2/bt2/native_bt_message_iterator.i index 7ebd7836..6336a214 100644 --- a/src/bindings/python/bt2/bt2/native_bt_message_iterator.i +++ b/src/bindings/python/bt2/bt2/native_bt_message_iterator.i @@ -23,7 +23,6 @@ */ %include -%include %include %include @@ -36,5 +35,3 @@ PyObject *bt_bt2_get_user_component_from_user_msg_iter( bt_self_message_iterator *self_message_iterator); PyObject *bt_bt2_self_component_port_input_get_msg_range( bt_self_component_port_input_message_iterator *iter); -PyObject *bt_bt2_port_output_get_msg_range( - bt_port_output_message_iterator *iter); diff --git a/src/bindings/python/bt2/bt2/native_bt_message_iterator.i.h b/src/bindings/python/bt2/bt2/native_bt_message_iterator.i.h index 2ce15de3..f687dc9a 100644 --- a/src/bindings/python/bt2/bt2/native_bt_message_iterator.i.h +++ b/src/bindings/python/bt2/bt2/native_bt_message_iterator.i.h @@ -94,15 +94,3 @@ static PyObject *bt_bt2_self_component_port_input_get_msg_range( &messages, &message_count); return get_msg_range_common(status, messages, message_count); } - -static PyObject *bt_bt2_port_output_get_msg_range( - bt_port_output_message_iterator *iter) -{ - bt_message_array_const messages; - uint64_t message_count = 0; - bt_message_iterator_next_status status; - - status = bt_port_output_message_iterator_next(iter, &messages, - &message_count); - return get_msg_range_common(status, messages, message_count); -} diff --git a/src/bindings/python/bt2/bt2/port.py b/src/bindings/python/bt2/bt2/port.py index bf11aa94..418f215b 100644 --- a/src/bindings/python/bt2/bt2/port.py +++ b/src/bindings/python/bt2/bt2/port.py @@ -23,7 +23,6 @@ from bt2 import native_bt, object from bt2 import component as bt2_component from bt2 import connection as bt2_connection -from bt2 import message_iterator as bt2_message_iterator from bt2 import message as bt2_message import bt2 diff --git a/src/lib/error.c b/src/lib/error.c index f54b65f3..3ae2db98 100644 --- a/src/lib/error.c +++ b/src/lib/error.c @@ -378,8 +378,6 @@ create_error_cause_message_iterator_actor(struct bt_message_iterator *iter, * message iterator, which is a self component port input * message iterator. */ - BT_ASSERT(iter->type == - BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT); input_port_iter = (void *) iter; cause = g_new0(struct bt_error_cause_message_iterator_actor, 1); if (!cause) { diff --git a/src/lib/graph/iterator.c b/src/lib/graph/iterator.c index b5033083..62e1f91e 100644 --- a/src/lib/graph/iterator.c +++ b/src/lib/graph/iterator.c @@ -43,7 +43,6 @@ #include #include #include -#include #include #include #include @@ -103,21 +102,6 @@ void set_self_comp_port_input_msg_iterator_state( iterator->state = state; } -static -void destroy_base_message_iterator(struct bt_object *obj) -{ - struct bt_message_iterator *iterator = (void *) obj; - - BT_ASSERT(iterator); - - if (iterator->msgs) { - g_ptr_array_free(iterator->msgs, TRUE); - iterator->msgs = NULL; - } - - g_free(iterator); -} - static void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj) { @@ -172,7 +156,12 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj iterator->upstream_msg_iters = NULL; } - destroy_base_message_iterator(obj); + if (iterator->msgs) { + g_ptr_array_free(iterator->msgs, TRUE); + iterator->msgs = NULL; + } + + g_free(iterator); } BT_HIDDEN @@ -284,28 +273,6 @@ void bt_self_component_port_input_message_iterator_set_connection( "%![iter-]+i, %![conn-]+x", iterator, connection); } -static -int init_message_iterator(struct bt_message_iterator *iterator, - enum bt_message_iterator_type type, - bt_object_release_func destroy) -{ - int ret = 0; - - bt_object_init_shared(&iterator->base, destroy); - iterator->type = type; - iterator->msgs = g_ptr_array_new(); - if (!iterator->msgs) { - BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray."); - ret = -1; - goto end; - } - - g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE); - -end: - return ret; -} - static bt_bool can_seek_ns_from_origin_true( struct bt_self_component_port_input_message_iterator *iterator, @@ -330,7 +297,6 @@ create_self_component_input_port_message_iterator( typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)( void *, void *, void *); - int ret; init_method_t init_method = NULL; struct bt_self_component_port_input_message_iterator *iterator = NULL; @@ -376,14 +342,15 @@ create_self_component_input_port_message_iterator( goto error; } - ret = init_message_iterator((void *) iterator, - BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT, + bt_object_init_shared(&iterator->base, bt_self_component_port_input_message_iterator_destroy); - if (ret) { - /* init_message_iterator() logs errors */ + iterator->msgs = g_ptr_array_new(); + if (!iterator->msgs) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray."); goto error; } + g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE); iterator->last_ns_from_origin = INT64_MIN; iterator->auto_seek.msgs = g_queue_new(); if (!iterator->auto_seek.msgs) { @@ -900,7 +867,7 @@ bt_self_component_port_input_message_iterator_next( */ *user_count = 0; status = (int) call_iterator_next_method(iterator, - (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE, + (void *) iterator->msgs->pdata, MSG_BATCH_SIZE, user_count); BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64, bt_common_func_status_string(status), *user_count); @@ -930,7 +897,7 @@ bt_self_component_port_input_message_iterator_next( "Invalid returned message count: greater than " "batch size: count=%" PRIu64 ", batch-size=%u", *user_count, MSG_BATCH_SIZE); - *msgs = (void *) iterator->base.msgs->pdata; + *msgs = (void *) iterator->msgs->pdata; break; case BT_FUNC_STATUS_AGAIN: goto end; @@ -947,46 +914,6 @@ end: return status; } -enum bt_message_iterator_next_status bt_port_output_message_iterator_next( - struct bt_port_output_message_iterator *iterator, - bt_message_array_const *msgs_to_user, - uint64_t *count_to_user) -{ - enum bt_message_iterator_next_status status; - int graph_status; - - BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); - BT_ASSERT_PRE_DEV_NON_NULL(msgs_to_user, "Message array (output)"); - BT_ASSERT_PRE_DEV_NON_NULL(count_to_user, "Message count (output)"); - BT_LIB_LOGD("Getting next output port message iterator's messages: " - "%!+i", iterator); - graph_status = bt_graph_consume_sink_no_check(iterator->graph, - iterator->colander); - switch (graph_status) { - case BT_FUNC_STATUS_AGAIN: - case BT_FUNC_STATUS_END: - case BT_FUNC_STATUS_MEMORY_ERROR: - status = (int) graph_status; - break; - case BT_FUNC_STATUS_OK: - status = BT_FUNC_STATUS_OK; - - /* - * On success, the colander sink moves the messages - * to this iterator's array and sets this iterator's - * message count: move them to the user. - */ - *msgs_to_user = (void *) iterator->base.msgs->pdata; - *count_to_user = iterator->count; - break; - default: - /* Other errors */ - status = BT_FUNC_STATUS_ERROR; - } - - return status; -} - struct bt_component * bt_self_component_port_input_message_iterator_borrow_component( struct bt_self_component_port_input_message_iterator *iterator) @@ -1023,172 +950,6 @@ struct bt_self_port_output *bt_self_message_iterator_borrow_port( return (void *) iterator->upstream_port; } -static -void bt_port_output_message_iterator_destroy(struct bt_object *obj) -{ - struct bt_port_output_message_iterator *iterator = (void *) obj; - - BT_LIB_LOGI("Destroying output port message iterator object: %!+i", - iterator); - BT_LOGD_STR("Putting graph."); - BT_OBJECT_PUT_REF_AND_RESET(iterator->graph); - BT_LOGD_STR("Putting colander sink component."); - BT_OBJECT_PUT_REF_AND_RESET(iterator->colander); - destroy_base_message_iterator(obj); -} - -struct bt_port_output_message_iterator * -bt_port_output_message_iterator_create(struct bt_graph *graph, - const struct bt_port_output *output_port) -{ - struct bt_port_output_message_iterator *iterator = NULL; - struct bt_component_class_sink *colander_comp_cls = NULL; - struct bt_component *output_port_comp = NULL; - struct bt_component_sink *colander_comp; - int graph_status; - struct bt_port_input *colander_in_port = NULL; - struct bt_component_class_sink_colander_data colander_data; - int ret; - - BT_ASSERT_PRE_NON_NULL(graph, "Graph"); - BT_ASSERT_PRE_NON_NULL(output_port, "Output port"); - output_port_comp = bt_port_borrow_component_inline( - (const void *) output_port); - BT_ASSERT_PRE(output_port_comp, - "Output port has no component: %!+p", output_port); - BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp) == - (void *) graph, - "Output port is not part of graph: %![graph-]+g, %![port-]+p", - graph, output_port); - BT_ASSERT_PRE(!graph->has_sink, - "Graph already has a sink component: %![graph-]+g"); - - /* Create message iterator */ - BT_LIB_LOGI("Creating message iterator on output port: " - "%![port-]+p, %![comp-]+c", output_port, output_port_comp); - iterator = g_new0(struct bt_port_output_message_iterator, 1); - if (!iterator) { - BT_LIB_LOGE_APPEND_CAUSE( - "Failed to allocate one output port message iterator."); - goto error; - } - - ret = init_message_iterator((void *) iterator, - BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT, - bt_port_output_message_iterator_destroy); - if (ret) { - /* init_message_iterator() logs errors */ - BT_OBJECT_PUT_REF_AND_RESET(iterator); - goto end; - } - - /* Create colander component */ - colander_comp_cls = bt_component_class_sink_colander_get(); - if (!colander_comp_cls) { - /* bt_component_class_sink_colander_get() logs errors */ - BT_LIB_LOGE_APPEND_CAUSE( - "Cannot get colander sink component class."); - goto error; - } - - iterator->graph = graph; - bt_object_get_no_null_check(iterator->graph); - colander_data.msgs = (void *) iterator->base.msgs->pdata; - colander_data.count_addr = &iterator->count; - - /* - * Hope that nobody uses this very unique name. - * - * We pass `BT_LOGGING_LEVEL_NONE` but the colander component - * class module does not use this level anyway since it belongs - * to the library. - */ - graph_status = bt_graph_add_sink_component_with_init_method_data( - (void *) graph, colander_comp_cls, - "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1", - NULL, &colander_data, BT_LOGGING_LEVEL_NONE, - (void *) &iterator->colander); - if (graph_status != BT_FUNC_STATUS_OK) { - BT_LIB_LOGE_APPEND_CAUSE( - "Cannot add colander sink component to graph: " - "%![graph-]+g, status=%s", graph, - bt_common_func_status_string(graph_status)); - goto error; - } - - /* - * Connect provided output port to the colander component's - * input port. - */ - colander_in_port = - (void *) bt_component_sink_borrow_input_port_by_index_const( - (void *) iterator->colander, 0); - BT_ASSERT(colander_in_port); - graph_status = bt_graph_connect_ports(graph, - output_port, colander_in_port, NULL); - if (graph_status != BT_FUNC_STATUS_OK) { - BT_LIB_LOGW_APPEND_CAUSE( - "Cannot connect colander sink's port: " - "%![graph-]+g, %![comp-]+c, status=%s", graph, - iterator->colander, - bt_common_func_status_string(graph_status)); - goto error; - } - - /* - * At this point everything went fine. Make the graph - * nonconsumable forever so that only this message iterator - * can consume (thanks to bt_graph_consume_sink_no_check()). - * This avoids leaking the message created by the colander - * sink and moved to the message iterator's message - * member. - */ - bt_graph_set_can_consume(iterator->graph, false); - - /* Also set the graph as being configured. */ - graph_status = bt_graph_configure(graph); - if (graph_status != BT_FUNC_STATUS_OK) { - BT_LIB_LOGW_APPEND_CAUSE( - "Cannot configure graph after having " - "added and connected colander sink: " - "%![graph-]+g, %![comp-]+c, status=%s", graph, - iterator->colander, - bt_common_func_status_string(graph_status)); - goto error; - } - goto end; - -error: - if (iterator && iterator->graph && iterator->colander) { - int ret; - - /* Remove created colander component from graph if any */ - colander_comp = iterator->colander; - BT_OBJECT_PUT_REF_AND_RESET(iterator->colander); - - /* - * At this point the colander component's reference - * count is 0 because iterator->colander was the only - * owner. We also know that it is not connected because - * this is the last operation before this function - * succeeds. - * - * Since we honor the preconditions here, - * bt_graph_remove_unconnected_component() always - * succeeds. - */ - ret = bt_graph_remove_unconnected_component(iterator->graph, - (void *) colander_comp); - BT_ASSERT(ret == 0); - } - - BT_OBJECT_PUT_REF_AND_RESET(iterator); - -end: - bt_object_put_ref(colander_comp_cls); - return (void *) iterator; -} - bt_bool bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( struct bt_self_component_port_input_message_iterator *iterator, int64_t ns_from_origin) @@ -2063,60 +1824,6 @@ end: return status; } -static inline -bt_self_component_port_input_message_iterator * -borrow_output_port_message_iterator_upstream_iterator( - struct bt_port_output_message_iterator *iterator) -{ - struct bt_component_class_sink_colander_priv_data *colander_data; - - BT_ASSERT(iterator); - colander_data = (void *) iterator->colander->parent.user_data; - BT_ASSERT(colander_data); - BT_ASSERT(colander_data->msg_iter); - return colander_data->msg_iter; -} - -bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin( - struct bt_port_output_message_iterator *iterator, - int64_t ns_from_origin) -{ - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( - borrow_output_port_message_iterator_upstream_iterator( - iterator), ns_from_origin); -} - -bt_bool bt_port_output_message_iterator_can_seek_beginning( - struct bt_port_output_message_iterator *iterator) -{ - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return bt_self_component_port_input_message_iterator_can_seek_beginning( - borrow_output_port_message_iterator_upstream_iterator( - iterator)); -} - -enum bt_message_iterator_seek_ns_from_origin_status -bt_port_output_message_iterator_seek_ns_from_origin( - struct bt_port_output_message_iterator *iterator, - int64_t ns_from_origin) -{ - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return bt_self_component_port_input_message_iterator_seek_ns_from_origin( - borrow_output_port_message_iterator_upstream_iterator(iterator), - ns_from_origin); -} - -enum bt_message_iterator_seek_beginning_status -bt_port_output_message_iterator_seek_beginning( - struct bt_port_output_message_iterator *iterator) -{ - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return bt_self_component_port_input_message_iterator_seek_beginning( - borrow_output_port_message_iterator_upstream_iterator( - iterator)); -} - bt_bool bt_self_message_iterator_is_interrupted( const struct bt_self_message_iterator *self_msg_iter) { @@ -2127,18 +1834,6 @@ bt_bool bt_self_message_iterator_is_interrupted( return (bt_bool) bt_graph_is_interrupted(iterator->graph); } -void bt_port_output_message_iterator_get_ref( - const struct bt_port_output_message_iterator *iterator) -{ - bt_object_get_ref(iterator); -} - -void bt_port_output_message_iterator_put_ref( - const struct bt_port_output_message_iterator *iterator) -{ - bt_object_put_ref(iterator); -} - void bt_self_component_port_input_message_iterator_get_ref( const struct bt_self_component_port_input_message_iterator *iterator) { diff --git a/src/lib/graph/message/iterator.h b/src/lib/graph/message/iterator.h index f48872ec..82689f61 100644 --- a/src/lib/graph/message/iterator.h +++ b/src/lib/graph/message/iterator.h @@ -36,11 +36,6 @@ struct bt_port; struct bt_graph; -enum bt_message_iterator_type { - BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT, - BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT, -}; - enum bt_self_component_port_input_message_iterator_state { /* Iterator is not initialized */ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED, @@ -70,12 +65,6 @@ enum bt_self_component_port_input_message_iterator_state { BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR, }; -struct bt_message_iterator { - struct bt_object base; - enum bt_message_iterator_type type; - GPtrArray *msgs; -}; - typedef enum bt_component_class_message_iterator_next_method_status (*bt_self_component_port_input_message_iterator_next_method)( void *, bt_message_array_const, uint64_t, uint64_t *); @@ -97,7 +86,8 @@ typedef bt_bool void *); struct bt_self_component_port_input_message_iterator { - struct bt_message_iterator base; + struct bt_object base; + GPtrArray *msgs; struct bt_component *upstream_component; /* Weak */ struct bt_port *upstream_port; /* Weak */ struct bt_connection *connection; /* Weak */ @@ -199,18 +189,6 @@ struct bt_self_component_port_input_message_iterator { void *user_data; }; -struct bt_port_output_message_iterator { - struct bt_message_iterator base; - struct bt_graph *graph; /* Owned by this */ - struct bt_component_sink *colander; /* Owned by this */ - - /* - * Only used temporarily as a bridge between a colander sink and - * the user. - */ - uint64_t count; -}; - BT_HIDDEN void bt_self_component_port_input_message_iterator_try_finalize( struct bt_self_component_port_input_message_iterator *iterator); diff --git a/src/lib/lib-logging.c b/src/lib/lib-logging.c index e9afb806..ce261a1a 100644 --- a/src/lib/lib-logging.c +++ b/src/lib/lib-logging.c @@ -1153,66 +1153,34 @@ static inline void format_message_iterator(char **buf_ch, bool extended, const char *prefix, const struct bt_message_iterator *iterator) { - const char *type; char tmp_prefix[TMP_PREFIX_LEN]; + const struct bt_self_component_port_input_message_iterator * + port_in_iter = (const void *) iterator; - if (iterator->type == BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT) { - type = "BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT"; - } else if (iterator->type == BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT) { - type = "BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT"; - } else { - type = "(unknown)"; + if (port_in_iter->upstream_component) { + SET_TMP_PREFIX("upstream-comp-"); + format_component(buf_ch, false, tmp_prefix, + port_in_iter->upstream_component); } - BUF_APPEND(", %stype=%s", PRFIELD(type)); - - switch (iterator->type) { - case BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT: - { - const struct bt_self_component_port_input_message_iterator * - port_in_iter = (const void *) iterator; - - if (port_in_iter->upstream_component) { - SET_TMP_PREFIX("upstream-comp-"); - format_component(buf_ch, false, tmp_prefix, - port_in_iter->upstream_component); - } - - if (port_in_iter->upstream_port) { - SET_TMP_PREFIX("upstream-port-"); - format_port(buf_ch, false, tmp_prefix, - port_in_iter->upstream_port); - } - - if (port_in_iter->connection) { - SET_TMP_PREFIX("upstream-conn-"); - format_connection(buf_ch, false, tmp_prefix, - port_in_iter->connection); - } - break; + if (!extended) { + goto end; } - case BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT: - { - const struct bt_port_output_message_iterator *port_out_iter = - (const void *) iterator; - if (port_out_iter->graph) { - SET_TMP_PREFIX("graph-"); - format_graph(buf_ch, false, tmp_prefix, - port_out_iter->graph); - } - - if (port_out_iter->colander) { - SET_TMP_PREFIX("colander-comp-"); - format_component(buf_ch, false, tmp_prefix, - (void *) port_out_iter->colander); - } - - break; + if (port_in_iter->upstream_port) { + SET_TMP_PREFIX("upstream-port-"); + format_port(buf_ch, false, tmp_prefix, + port_in_iter->upstream_port); } - default: - break; + + if (port_in_iter->connection) { + SET_TMP_PREFIX("upstream-conn-"); + format_connection(buf_ch, false, tmp_prefix, + port_in_iter->connection); } + +end: + return; } static inline void format_plugin(char **buf_ch, bool extended, diff --git a/tests/bindings/python/bt2/test_clock_class.py b/tests/bindings/python/bt2/test_clock_class.py index 48ae9602..3805c0d5 100644 --- a/tests/bindings/python/bt2/test_clock_class.py +++ b/tests/bindings/python/bt2/test_clock_class.py @@ -20,7 +20,7 @@ import unittest import uuid import copy import bt2 -from utils import run_in_component_init +from utils import run_in_component_init, TestOutputPortMessageIterator class ClockClassOffsetTestCase(unittest.TestCase): @@ -244,8 +244,8 @@ class ClockSnapshotTestCase(unittest.TestCase): self._graph = bt2.Graph() self._src_comp = self._graph.add_component(MySrc, 'my_source') - self._msg_iter = self._graph.create_output_port_message_iterator( - self._src_comp.output_ports['out'] + self._msg_iter = TestOutputPortMessageIterator( + self._graph, self._src_comp.output_ports['out'] ) for i, msg in enumerate(self._msg_iter): diff --git a/tests/bindings/python/bt2/test_event.py b/tests/bindings/python/bt2/test_event.py index ea65b590..b146f261 100644 --- a/tests/bindings/python/bt2/test_event.py +++ b/tests/bindings/python/bt2/test_event.py @@ -19,6 +19,7 @@ from collections import OrderedDict import unittest import bt2 +from utils import TestOutputPortMessageIterator class EventTestCase(unittest.TestCase): @@ -153,8 +154,8 @@ class EventTestCase(unittest.TestCase): test_obj = self self._graph = bt2.Graph() self._src_comp = self._graph.add_component(MySrc, 'my_source') - self._msg_iter = self._graph.create_output_port_message_iterator( - self._src_comp.output_ports['out'] + self._msg_iter = TestOutputPortMessageIterator( + self._graph, self._src_comp.output_ports['out'] ) for msg in self._msg_iter: diff --git a/tests/bindings/python/bt2/test_message.py b/tests/bindings/python/bt2/test_message.py index 78e8dec0..ef8e6b80 100644 --- a/tests/bindings/python/bt2/test_message.py +++ b/tests/bindings/python/bt2/test_message.py @@ -19,6 +19,7 @@ import collections import unittest import bt2 +from utils import TestOutputPortMessageIterator class AllMessagesTestCase(unittest.TestCase): @@ -148,8 +149,8 @@ class AllMessagesTestCase(unittest.TestCase): def test_all_msg_with_cc(self): params = {'with_cc': True} self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator( - self._src_comp.output_ports['out'] + self._msg_iter = TestOutputPortMessageIterator( + self._graph, self._src_comp.output_ports['out'] ) for i, msg in enumerate(self._msg_iter): @@ -204,8 +205,8 @@ class AllMessagesTestCase(unittest.TestCase): def test_all_msg_without_cc(self): params = {'with_cc': False} self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator( - self._src_comp.output_ports['out'] + self._msg_iter = TestOutputPortMessageIterator( + self._graph, self._src_comp.output_ports['out'] ) for i, msg in enumerate(self._msg_iter): @@ -273,8 +274,8 @@ class AllMessagesTestCase(unittest.TestCase): params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True} self._src_comp = self._graph.add_component(self._src, 'my_source', params) - self._msg_iter = self._graph.create_output_port_message_iterator( - self._src_comp.output_ports['out'] + self._msg_iter = TestOutputPortMessageIterator( + self._graph, self._src_comp.output_ports['out'] ) msgs = list(self._msg_iter) diff --git a/tests/bindings/python/bt2/test_message_iterator.py b/tests/bindings/python/bt2/test_message_iterator.py index 43af1e1f..3cd27e5b 100644 --- a/tests/bindings/python/bt2/test_message_iterator.py +++ b/tests/bindings/python/bt2/test_message_iterator.py @@ -21,6 +21,7 @@ import collections import unittest import copy import bt2 +from utils import TestOutputPortMessageIterator class UserMessageIteratorTestCase(unittest.TestCase): @@ -196,7 +197,7 @@ class UserMessageIteratorTestCase(unittest.TestCase): graph = bt2.Graph() src = graph.add_component(MySource, 'src') - it = graph.create_output_port_message_iterator(src.output_ports['out']) + it = TestOutputPortMessageIterator(graph, src.output_ports['out']) # Skip beginning messages. msg = next(it) @@ -212,7 +213,7 @@ class UserMessageIteratorTestCase(unittest.TestCase): self.assertEqual(msg_ev1.addr, msg_ev2.addr) @staticmethod - def _setup_seek_beginning_test(): + def _setup_seek_beginning_test(sink_cls): # Use a source, a filter and an output port iterator. This allows us # to test calling `seek_beginning` on both a _OutputPortMessageIterator # and a _UserComponentInputPortMessageIterator, on top of checking that @@ -279,25 +280,42 @@ class UserMessageIteratorTestCase(unittest.TestCase): graph = bt2.Graph() src = graph.add_component(MySource, 'src') flt = graph.add_component(MyFilter, 'flt') + sink = graph.add_component(sink_cls, 'sink') graph.connect_ports(src.output_ports['out'], flt.input_ports['in']) - it = graph.create_output_port_message_iterator(flt.output_ports['out']) - - return it, MySourceIter + graph.connect_ports(flt.output_ports['out'], sink.input_ports['in']) + return MySourceIter, graph def test_can_seek_beginning(self): - it, MySourceIter = self._setup_seek_beginning_test() + class MySink(bt2._UserSinkComponent): + def __init__(self, params, obj): + self._add_input_port('in') + + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) + + def _user_consume(self): + nonlocal can_seek_beginning + can_seek_beginning = self._msg_iter.can_seek_beginning + + MySourceIter, graph = self._setup_seek_beginning_test(MySink) def _user_can_seek_beginning(self): - nonlocal can_seek_beginning - return can_seek_beginning + nonlocal input_port_iter_can_seek_beginning + return input_port_iter_can_seek_beginning MySourceIter._user_can_seek_beginning = property(_user_can_seek_beginning) - can_seek_beginning = True - self.assertTrue(it.can_seek_beginning) + input_port_iter_can_seek_beginning = True + can_seek_beginning = None + graph.run_once() + self.assertTrue(can_seek_beginning) - can_seek_beginning = False - self.assertFalse(it.can_seek_beginning) + input_port_iter_can_seek_beginning = False + can_seek_beginning = None + graph.run_once() + self.assertFalse(can_seek_beginning) # Once can_seek_beginning returns an error, verify that it raises when # _can_seek_beginning has/returns the wrong type. @@ -306,42 +324,62 @@ class UserMessageIteratorTestCase(unittest.TestCase): # a _seek_beginning method to know whether the iterator can seek to # beginning or not. del MySourceIter._user_can_seek_beginning - self.assertTrue(it.can_seek_beginning) + can_seek_beginning = None + graph.run_once() + self.assertTrue(can_seek_beginning) del MySourceIter._user_seek_beginning - self.assertFalse(it.can_seek_beginning) + can_seek_beginning = None + graph.run_once() + self.assertFalse(can_seek_beginning) def test_seek_beginning(self): - it, MySourceIter = self._setup_seek_beginning_test() + class MySink(bt2._UserSinkComponent): + def __init__(self, params, obj): + self._add_input_port('in') - msg = next(it) + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) + + def _user_consume(self): + nonlocal do_seek_beginning + nonlocal msg + + if do_seek_beginning: + self._msg_iter.seek_beginning() + return + + msg = next(self._msg_iter) + + do_seek_beginning = False + msg = None + MySourceIter, graph = self._setup_seek_beginning_test(MySink) + graph.run_once() self.assertIsInstance(msg, bt2._StreamBeginningMessage) - msg = next(it) + graph.run_once() self.assertIsInstance(msg, bt2._PacketBeginningMessage) + do_seek_beginning = True + graph.run_once() + do_seek_beginning = False + graph.run_once() + self.assertIsInstance(msg, bt2._StreamBeginningMessage) - it.seek_beginning() + def test_seek_beginning_user_error(self): + class MySink(bt2._UserSinkComponent): + def __init__(self, params, obj): + self._add_input_port('in') - msg = next(it) - self.assertIsInstance(msg, bt2._StreamBeginningMessage) + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) - # Verify that we can seek beginning after having reached the end. - # - # It currently does not work to seek an output port message iterator - # once it's ended, but we should eventually make it work and uncomment - # the following snippet. - # - # try: - # while True: - # next(it) - # except bt2.Stop: - # pass - # - # it.seek_beginning() - # msg = next(it) - # self.assertIsInstance(msg, bt2._StreamBeginningMessage) + def _user_consume(self): + self._msg_iter.seek_beginning() - def test_seek_beginning_user_error(self): - it, MySourceIter = self._setup_seek_beginning_test() + MySourceIter, graph = self._setup_seek_beginning_test(MySink) def _user_seek_beginning_error(self): raise ValueError('ouch') @@ -349,7 +387,7 @@ class UserMessageIteratorTestCase(unittest.TestCase): MySourceIter._user_seek_beginning = _user_seek_beginning_error with self.assertRaises(bt2._Error): - it.seek_beginning() + graph.run_once() # Try consuming many times from an iterator that always returns TryAgain. # This verifies that we are not missing an incref of Py_None, making the @@ -365,7 +403,7 @@ class UserMessageIteratorTestCase(unittest.TestCase): graph = bt2.Graph() src = graph.add_component(MySource, 'src') - it = graph.create_output_port_message_iterator(src.output_ports['out']) + it = TestOutputPortMessageIterator(graph, src.output_ports['out']) # The initial refcount of Py_None was in the 7000, so 100000 iterations # should be enough to catch the bug even if there are small differences @@ -375,77 +413,5 @@ class UserMessageIteratorTestCase(unittest.TestCase): next(it) -class OutputPortMessageIteratorTestCase(unittest.TestCase): - def test_component(self): - class MyIter(bt2._UserMessageIterator): - def __init__(self, self_port_output): - self._at = 0 - - def __next__(self): - if self._at == 7: - raise bt2.Stop - - if self._at == 0: - msg = self._create_stream_beginning_message(test_obj._stream) - elif self._at == 1: - msg = self._create_packet_beginning_message(test_obj._packet) - elif self._at == 5: - msg = self._create_packet_end_message(test_obj._packet) - elif self._at == 6: - msg = self._create_stream_end_message(test_obj._stream) - else: - msg = self._create_event_message( - test_obj._event_class, test_obj._packet - ) - msg.event.payload_field['my_int'] = self._at * 3 - - self._at += 1 - return msg - - class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): - def __init__(self, params, obj): - self._add_output_port('out') - - trace_class = self._create_trace_class() - stream_class = trace_class.create_stream_class(supports_packets=True) - - # Create payload field class - my_int_ft = trace_class.create_signed_integer_field_class(32) - payload_ft = trace_class.create_structure_field_class() - payload_ft += [('my_int', my_int_ft)] - - event_class = stream_class.create_event_class( - name='salut', payload_field_class=payload_ft - ) - - trace = trace_class() - stream = trace.create_stream(stream_class) - packet = stream.create_packet() - - test_obj._event_class = event_class - test_obj._stream = stream - test_obj._packet = packet - - test_obj = self - graph = bt2.Graph() - src = graph.add_component(MySource, 'src') - msg_iter = graph.create_output_port_message_iterator(src.output_ports['out']) - - for at, msg in enumerate(msg_iter): - if at == 0: - self.assertIsInstance(msg, bt2._StreamBeginningMessage) - elif at == 1: - self.assertIsInstance(msg, bt2._PacketBeginningMessage) - elif at == 5: - self.assertIsInstance(msg, bt2._PacketEndMessage) - elif at == 6: - self.assertIsInstance(msg, bt2._StreamEndMessage) - else: - self.assertIsInstance(msg, bt2._EventMessage) - self.assertEqual(msg.event.cls.name, 'salut') - field = msg.event.payload_field['my_int'] - self.assertEqual(field, at * 3) - - if __name__ == '__main__': unittest.main() diff --git a/tests/bindings/python/bt2/utils.py b/tests/bindings/python/bt2/utils.py index 1f9a6c27..7707a531 100644 --- a/tests/bindings/python/bt2/utils.py +++ b/tests/bindings/python/bt2/utils.py @@ -17,13 +17,12 @@ # import bt2 +import collections.abc # Run callable `func` in the context of a component's __init__ method. The # callable is passed the Component being instantiated. # # The value returned by the callable is returned by run_in_component_init. - - def run_in_component_init(func): class MySink(bt2._UserSinkComponent): def __init__(self, params, obj): @@ -52,10 +51,56 @@ def run_in_component_init(func): # Create an empty trace class with default values. - - def get_default_trace_class(): def f(comp_self): return comp_self._create_trace_class() return run_in_component_init(f) + + +# Proxy sink component class. +# +# This sink accepts a list of a single item as its initialization +# object. This sink creates a single input port `in`. When it consumes +# from this port, it puts the returned message in the initialization +# list as the first item. +class TestProxySink(bt2._UserSinkComponent): + def __init__(self, params, msg_list): + assert msg_list is not None + self._msg_list = msg_list + self._add_input_port('in') + + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) + + def _user_consume(self): + assert self._msg_list[0] is None + self._msg_list[0] = next(self._msg_iter) + + +# This is a helper message iterator for tests. +# +# The constructor accepts a graph and an output port. +# +# Internally, it adds a proxy sink to the graph and connects the +# received output port to the proxy sink's input port. Its __next__() +# method then uses the proxy sink to transfer the consumed message to +# the output port message iterator's user. +# +# This message iterator cannot seek. +class TestOutputPortMessageIterator(collections.abc.Iterator): + def __init__(self, graph, output_port): + self._graph = graph + self._msg_list = [None] + sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list) + graph.connect_ports(output_port, sink.input_ports['in']) + + def __next__(self): + assert self._msg_list[0] is None + self._graph.run_once() + msg = self._msg_list[0] + assert msg is not None + self._msg_list[0] = None + return msg