babeltrace2/graph/port-const.h \
babeltrace2/graph/port-input-const.h \
babeltrace2/graph/port-output-const.h \
- babeltrace2/graph/port-output-message-iterator.h \
babeltrace2/graph/private-query-executor.h \
babeltrace2/graph/query-executor-const.h \
babeltrace2/graph/query-executor.h \
/* Message iterator API */
#include <babeltrace2/graph/message-iterator.h>
-#include <babeltrace2/graph/port-output-message-iterator.h>
#include <babeltrace2/graph/self-component-port-input-message-iterator.h>
#include <babeltrace2/graph/self-message-iterator.h>
+++ /dev/null
-#ifndef BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H
-#define BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H
-
-/*
- * Copyright (c) 2010-2019 EfficiOS Inc. and Linux Foundation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-#ifndef __BT_IN_BABELTRACE_H
-# error "Please include <babeltrace2/babeltrace.h> instead."
-#endif
-
-#include <stdint.h>
-
-#include <babeltrace2/graph/message-iterator.h>
-#include <babeltrace2/types.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-static inline
-bt_message_iterator *
-bt_port_output_message_iterator_as_message_iterator(
- bt_port_output_message_iterator *iterator)
-{
- return __BT_UPCAST(bt_message_iterator, iterator);
-}
-
-extern bt_port_output_message_iterator *
-bt_port_output_message_iterator_create(
- bt_graph *graph,
- const bt_port_output *output_port);
-
-extern bt_message_iterator_next_status
-bt_port_output_message_iterator_next(
- bt_port_output_message_iterator *iterator,
- bt_message_array_const *msgs, uint64_t *count);
-
-extern bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin(
- bt_port_output_message_iterator *iterator,
- int64_t ns_from_origin);
-
-extern bt_bool bt_port_output_message_iterator_can_seek_beginning(
- bt_port_output_message_iterator *iterator);
-
-extern bt_message_iterator_seek_ns_from_origin_status
-bt_port_output_message_iterator_seek_ns_from_origin(
- bt_port_output_message_iterator *iterator,
- int64_t ns_from_origin);
-
-extern bt_message_iterator_seek_beginning_status
-bt_port_output_message_iterator_seek_beginning(
- bt_port_output_message_iterator *iterator);
-
-extern void bt_port_output_message_iterator_get_ref(
- const bt_port_output_message_iterator *port_output_message_iterator);
-
-extern void bt_port_output_message_iterator_put_ref(
- const bt_port_output_message_iterator *port_output_message_iterator);
-
-#define BT_PORT_OUTPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(_var) \
- do { \
- bt_port_output_message_iterator_put_ref(_var); \
- (_var) = NULL; \
- } while (0)
-
-#define BT_PORT_OUTPUT_MESSAGE_ITERATOR_MOVE_REF(_var_dst, _var_src) \
- do { \
- bt_port_output_message_iterator_put_ref(_var_dst); \
- (_var_dst) = (_var_src); \
- (_var_src) = NULL; \
- } while (0)
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H */
from bt2 import interrupter as bt2_interrupter
from bt2 import connection as bt2_connection
from bt2 import component as bt2_component
-from bt2 import message_iterator as bt2_message_iterator
import functools
from bt2 import port as bt2_port
from bt2 import logging as bt2_logging
def interrupt(self):
native_bt.graph_interrupt(self._ptr)
-
- def create_output_port_message_iterator(self, output_port):
- utils._check_type(output_port, bt2_port._OutputPort)
- msg_iter_ptr = native_bt.port_output_message_iterator_create(
- self._ptr, output_port._ptr
- )
-
- if msg_iter_ptr is None:
- raise bt2._MemoryError('cannot create output port message iterator')
-
- return bt2_message_iterator._OutputPortMessageIterator(msg_iter_ptr)
)
-# This is created when the user wants to iterate on a component's output port,
-# from outside the graph.
-class _OutputPortMessageIterator(_GenericMessageIterator):
- _get_msg_range = staticmethod(native_bt.bt2_port_output_get_msg_range)
- _get_ref = staticmethod(native_bt.port_output_message_iterator_get_ref)
- _put_ref = staticmethod(native_bt.port_output_message_iterator_put_ref)
- _can_seek_beginning = staticmethod(
- native_bt.port_output_message_iterator_can_seek_beginning
- )
- _seek_beginning = staticmethod(
- native_bt.port_output_message_iterator_seek_beginning
- )
-
-
# This is extended by the user to implement component classes in Python. It
# is created for a given output port when an input port message iterator is
# created on the input port on the other side of the connection. It is also
*/
%include <babeltrace2/graph/message-iterator.h>
-%include <babeltrace2/graph/port-output-message-iterator.h>
%include <babeltrace2/graph/self-component-port-input-message-iterator.h>
%include <babeltrace2/graph/self-message-iterator.h>
bt_self_message_iterator *self_message_iterator);
PyObject *bt_bt2_self_component_port_input_get_msg_range(
bt_self_component_port_input_message_iterator *iter);
-PyObject *bt_bt2_port_output_get_msg_range(
- bt_port_output_message_iterator *iter);
&messages, &message_count);
return get_msg_range_common(status, messages, message_count);
}
-
-static PyObject *bt_bt2_port_output_get_msg_range(
- bt_port_output_message_iterator *iter)
-{
- bt_message_array_const messages;
- uint64_t message_count = 0;
- bt_message_iterator_next_status status;
-
- status = bt_port_output_message_iterator_next(iter, &messages,
- &message_count);
- return get_msg_range_common(status, messages, message_count);
-}
from bt2 import native_bt, object
from bt2 import component as bt2_component
from bt2 import connection as bt2_connection
-from bt2 import message_iterator as bt2_message_iterator
from bt2 import message as bt2_message
import bt2
* message iterator, which is a self component port input
* message iterator.
*/
- BT_ASSERT(iter->type ==
- BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT);
input_port_iter = (void *) iter;
cause = g_new0(struct bt_error_cause_message_iterator_actor, 1);
if (!cause) {
#include <babeltrace2/graph/message-const.h>
#include <babeltrace2/graph/message-iterator.h>
#include <babeltrace2/graph/self-component-port-input-message-iterator.h>
-#include <babeltrace2/graph/port-output-message-iterator.h>
#include <babeltrace2/graph/message-event-const.h>
#include <babeltrace2/graph/message-message-iterator-inactivity-const.h>
#include <babeltrace2/graph/message-packet-beginning.h>
iterator->state = state;
}
-static
-void destroy_base_message_iterator(struct bt_object *obj)
-{
- struct bt_message_iterator *iterator = (void *) obj;
-
- BT_ASSERT(iterator);
-
- if (iterator->msgs) {
- g_ptr_array_free(iterator->msgs, TRUE);
- iterator->msgs = NULL;
- }
-
- g_free(iterator);
-}
-
static
void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj)
{
iterator->upstream_msg_iters = NULL;
}
- destroy_base_message_iterator(obj);
+ if (iterator->msgs) {
+ g_ptr_array_free(iterator->msgs, TRUE);
+ iterator->msgs = NULL;
+ }
+
+ g_free(iterator);
}
BT_HIDDEN
"%![iter-]+i, %![conn-]+x", iterator, connection);
}
-static
-int init_message_iterator(struct bt_message_iterator *iterator,
- enum bt_message_iterator_type type,
- bt_object_release_func destroy)
-{
- int ret = 0;
-
- bt_object_init_shared(&iterator->base, destroy);
- iterator->type = type;
- iterator->msgs = g_ptr_array_new();
- if (!iterator->msgs) {
- BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray.");
- ret = -1;
- goto end;
- }
-
- g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE);
-
-end:
- return ret;
-}
-
static
bt_bool can_seek_ns_from_origin_true(
struct bt_self_component_port_input_message_iterator *iterator,
typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)(
void *, void *, void *);
- int ret;
init_method_t init_method = NULL;
struct bt_self_component_port_input_message_iterator *iterator =
NULL;
goto error;
}
- ret = init_message_iterator((void *) iterator,
- BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT,
+ bt_object_init_shared(&iterator->base,
bt_self_component_port_input_message_iterator_destroy);
- if (ret) {
- /* init_message_iterator() logs errors */
+ iterator->msgs = g_ptr_array_new();
+ if (!iterator->msgs) {
+ BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray.");
goto error;
}
+ g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE);
iterator->last_ns_from_origin = INT64_MIN;
iterator->auto_seek.msgs = g_queue_new();
if (!iterator->auto_seek.msgs) {
*/
*user_count = 0;
status = (int) call_iterator_next_method(iterator,
- (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE,
+ (void *) iterator->msgs->pdata, MSG_BATCH_SIZE,
user_count);
BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64,
bt_common_func_status_string(status), *user_count);
"Invalid returned message count: greater than "
"batch size: count=%" PRIu64 ", batch-size=%u",
*user_count, MSG_BATCH_SIZE);
- *msgs = (void *) iterator->base.msgs->pdata;
+ *msgs = (void *) iterator->msgs->pdata;
break;
case BT_FUNC_STATUS_AGAIN:
goto end;
return status;
}
-enum bt_message_iterator_next_status bt_port_output_message_iterator_next(
- struct bt_port_output_message_iterator *iterator,
- bt_message_array_const *msgs_to_user,
- uint64_t *count_to_user)
-{
- enum bt_message_iterator_next_status status;
- int graph_status;
-
- BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
- BT_ASSERT_PRE_DEV_NON_NULL(msgs_to_user, "Message array (output)");
- BT_ASSERT_PRE_DEV_NON_NULL(count_to_user, "Message count (output)");
- BT_LIB_LOGD("Getting next output port message iterator's messages: "
- "%!+i", iterator);
- graph_status = bt_graph_consume_sink_no_check(iterator->graph,
- iterator->colander);
- switch (graph_status) {
- case BT_FUNC_STATUS_AGAIN:
- case BT_FUNC_STATUS_END:
- case BT_FUNC_STATUS_MEMORY_ERROR:
- status = (int) graph_status;
- break;
- case BT_FUNC_STATUS_OK:
- status = BT_FUNC_STATUS_OK;
-
- /*
- * On success, the colander sink moves the messages
- * to this iterator's array and sets this iterator's
- * message count: move them to the user.
- */
- *msgs_to_user = (void *) iterator->base.msgs->pdata;
- *count_to_user = iterator->count;
- break;
- default:
- /* Other errors */
- status = BT_FUNC_STATUS_ERROR;
- }
-
- return status;
-}
-
struct bt_component *
bt_self_component_port_input_message_iterator_borrow_component(
struct bt_self_component_port_input_message_iterator *iterator)
return (void *) iterator->upstream_port;
}
-static
-void bt_port_output_message_iterator_destroy(struct bt_object *obj)
-{
- struct bt_port_output_message_iterator *iterator = (void *) obj;
-
- BT_LIB_LOGI("Destroying output port message iterator object: %!+i",
- iterator);
- BT_LOGD_STR("Putting graph.");
- BT_OBJECT_PUT_REF_AND_RESET(iterator->graph);
- BT_LOGD_STR("Putting colander sink component.");
- BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
- destroy_base_message_iterator(obj);
-}
-
-struct bt_port_output_message_iterator *
-bt_port_output_message_iterator_create(struct bt_graph *graph,
- const struct bt_port_output *output_port)
-{
- struct bt_port_output_message_iterator *iterator = NULL;
- struct bt_component_class_sink *colander_comp_cls = NULL;
- struct bt_component *output_port_comp = NULL;
- struct bt_component_sink *colander_comp;
- int graph_status;
- struct bt_port_input *colander_in_port = NULL;
- struct bt_component_class_sink_colander_data colander_data;
- int ret;
-
- BT_ASSERT_PRE_NON_NULL(graph, "Graph");
- BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
- output_port_comp = bt_port_borrow_component_inline(
- (const void *) output_port);
- BT_ASSERT_PRE(output_port_comp,
- "Output port has no component: %!+p", output_port);
- BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp) ==
- (void *) graph,
- "Output port is not part of graph: %![graph-]+g, %![port-]+p",
- graph, output_port);
- BT_ASSERT_PRE(!graph->has_sink,
- "Graph already has a sink component: %![graph-]+g");
-
- /* Create message iterator */
- BT_LIB_LOGI("Creating message iterator on output port: "
- "%![port-]+p, %![comp-]+c", output_port, output_port_comp);
- iterator = g_new0(struct bt_port_output_message_iterator, 1);
- if (!iterator) {
- BT_LIB_LOGE_APPEND_CAUSE(
- "Failed to allocate one output port message iterator.");
- goto error;
- }
-
- ret = init_message_iterator((void *) iterator,
- BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT,
- bt_port_output_message_iterator_destroy);
- if (ret) {
- /* init_message_iterator() logs errors */
- BT_OBJECT_PUT_REF_AND_RESET(iterator);
- goto end;
- }
-
- /* Create colander component */
- colander_comp_cls = bt_component_class_sink_colander_get();
- if (!colander_comp_cls) {
- /* bt_component_class_sink_colander_get() logs errors */
- BT_LIB_LOGE_APPEND_CAUSE(
- "Cannot get colander sink component class.");
- goto error;
- }
-
- iterator->graph = graph;
- bt_object_get_no_null_check(iterator->graph);
- colander_data.msgs = (void *) iterator->base.msgs->pdata;
- colander_data.count_addr = &iterator->count;
-
- /*
- * Hope that nobody uses this very unique name.
- *
- * We pass `BT_LOGGING_LEVEL_NONE` but the colander component
- * class module does not use this level anyway since it belongs
- * to the library.
- */
- graph_status = bt_graph_add_sink_component_with_init_method_data(
- (void *) graph, colander_comp_cls,
- "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1",
- NULL, &colander_data, BT_LOGGING_LEVEL_NONE,
- (void *) &iterator->colander);
- if (graph_status != BT_FUNC_STATUS_OK) {
- BT_LIB_LOGE_APPEND_CAUSE(
- "Cannot add colander sink component to graph: "
- "%![graph-]+g, status=%s", graph,
- bt_common_func_status_string(graph_status));
- goto error;
- }
-
- /*
- * Connect provided output port to the colander component's
- * input port.
- */
- colander_in_port =
- (void *) bt_component_sink_borrow_input_port_by_index_const(
- (void *) iterator->colander, 0);
- BT_ASSERT(colander_in_port);
- graph_status = bt_graph_connect_ports(graph,
- output_port, colander_in_port, NULL);
- if (graph_status != BT_FUNC_STATUS_OK) {
- BT_LIB_LOGW_APPEND_CAUSE(
- "Cannot connect colander sink's port: "
- "%![graph-]+g, %![comp-]+c, status=%s", graph,
- iterator->colander,
- bt_common_func_status_string(graph_status));
- goto error;
- }
-
- /*
- * At this point everything went fine. Make the graph
- * nonconsumable forever so that only this message iterator
- * can consume (thanks to bt_graph_consume_sink_no_check()).
- * This avoids leaking the message created by the colander
- * sink and moved to the message iterator's message
- * member.
- */
- bt_graph_set_can_consume(iterator->graph, false);
-
- /* Also set the graph as being configured. */
- graph_status = bt_graph_configure(graph);
- if (graph_status != BT_FUNC_STATUS_OK) {
- BT_LIB_LOGW_APPEND_CAUSE(
- "Cannot configure graph after having "
- "added and connected colander sink: "
- "%![graph-]+g, %![comp-]+c, status=%s", graph,
- iterator->colander,
- bt_common_func_status_string(graph_status));
- goto error;
- }
- goto end;
-
-error:
- if (iterator && iterator->graph && iterator->colander) {
- int ret;
-
- /* Remove created colander component from graph if any */
- colander_comp = iterator->colander;
- BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
-
- /*
- * At this point the colander component's reference
- * count is 0 because iterator->colander was the only
- * owner. We also know that it is not connected because
- * this is the last operation before this function
- * succeeds.
- *
- * Since we honor the preconditions here,
- * bt_graph_remove_unconnected_component() always
- * succeeds.
- */
- ret = bt_graph_remove_unconnected_component(iterator->graph,
- (void *) colander_comp);
- BT_ASSERT(ret == 0);
- }
-
- BT_OBJECT_PUT_REF_AND_RESET(iterator);
-
-end:
- bt_object_put_ref(colander_comp_cls);
- return (void *) iterator;
-}
-
bt_bool bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
struct bt_self_component_port_input_message_iterator *iterator,
int64_t ns_from_origin)
return status;
}
-static inline
-bt_self_component_port_input_message_iterator *
-borrow_output_port_message_iterator_upstream_iterator(
- struct bt_port_output_message_iterator *iterator)
-{
- struct bt_component_class_sink_colander_priv_data *colander_data;
-
- BT_ASSERT(iterator);
- colander_data = (void *) iterator->colander->parent.user_data;
- BT_ASSERT(colander_data);
- BT_ASSERT(colander_data->msg_iter);
- return colander_data->msg_iter;
-}
-
-bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin(
- struct bt_port_output_message_iterator *iterator,
- int64_t ns_from_origin)
-{
- BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
- return bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
- borrow_output_port_message_iterator_upstream_iterator(
- iterator), ns_from_origin);
-}
-
-bt_bool bt_port_output_message_iterator_can_seek_beginning(
- struct bt_port_output_message_iterator *iterator)
-{
- BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
- return bt_self_component_port_input_message_iterator_can_seek_beginning(
- borrow_output_port_message_iterator_upstream_iterator(
- iterator));
-}
-
-enum bt_message_iterator_seek_ns_from_origin_status
-bt_port_output_message_iterator_seek_ns_from_origin(
- struct bt_port_output_message_iterator *iterator,
- int64_t ns_from_origin)
-{
- BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
- return bt_self_component_port_input_message_iterator_seek_ns_from_origin(
- borrow_output_port_message_iterator_upstream_iterator(iterator),
- ns_from_origin);
-}
-
-enum bt_message_iterator_seek_beginning_status
-bt_port_output_message_iterator_seek_beginning(
- struct bt_port_output_message_iterator *iterator)
-{
- BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
- return bt_self_component_port_input_message_iterator_seek_beginning(
- borrow_output_port_message_iterator_upstream_iterator(
- iterator));
-}
-
bt_bool bt_self_message_iterator_is_interrupted(
const struct bt_self_message_iterator *self_msg_iter)
{
return (bt_bool) bt_graph_is_interrupted(iterator->graph);
}
-void bt_port_output_message_iterator_get_ref(
- const struct bt_port_output_message_iterator *iterator)
-{
- bt_object_get_ref(iterator);
-}
-
-void bt_port_output_message_iterator_put_ref(
- const struct bt_port_output_message_iterator *iterator)
-{
- bt_object_put_ref(iterator);
-}
-
void bt_self_component_port_input_message_iterator_get_ref(
const struct bt_self_component_port_input_message_iterator *iterator)
{
struct bt_port;
struct bt_graph;
-enum bt_message_iterator_type {
- BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT,
- BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT,
-};
-
enum bt_self_component_port_input_message_iterator_state {
/* Iterator is not initialized */
BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED,
BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR,
};
-struct bt_message_iterator {
- struct bt_object base;
- enum bt_message_iterator_type type;
- GPtrArray *msgs;
-};
-
typedef enum bt_component_class_message_iterator_next_method_status
(*bt_self_component_port_input_message_iterator_next_method)(
void *, bt_message_array_const, uint64_t, uint64_t *);
void *);
struct bt_self_component_port_input_message_iterator {
- struct bt_message_iterator base;
+ struct bt_object base;
+ GPtrArray *msgs;
struct bt_component *upstream_component; /* Weak */
struct bt_port *upstream_port; /* Weak */
struct bt_connection *connection; /* Weak */
void *user_data;
};
-struct bt_port_output_message_iterator {
- struct bt_message_iterator base;
- struct bt_graph *graph; /* Owned by this */
- struct bt_component_sink *colander; /* Owned by this */
-
- /*
- * Only used temporarily as a bridge between a colander sink and
- * the user.
- */
- uint64_t count;
-};
-
BT_HIDDEN
void bt_self_component_port_input_message_iterator_try_finalize(
struct bt_self_component_port_input_message_iterator *iterator);
bool extended, const char *prefix,
const struct bt_message_iterator *iterator)
{
- const char *type;
char tmp_prefix[TMP_PREFIX_LEN];
+ const struct bt_self_component_port_input_message_iterator *
+ port_in_iter = (const void *) iterator;
- if (iterator->type == BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT) {
- type = "BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT";
- } else if (iterator->type == BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT) {
- type = "BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT";
- } else {
- type = "(unknown)";
+ if (port_in_iter->upstream_component) {
+ SET_TMP_PREFIX("upstream-comp-");
+ format_component(buf_ch, false, tmp_prefix,
+ port_in_iter->upstream_component);
}
- BUF_APPEND(", %stype=%s", PRFIELD(type));
-
- switch (iterator->type) {
- case BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT:
- {
- const struct bt_self_component_port_input_message_iterator *
- port_in_iter = (const void *) iterator;
-
- if (port_in_iter->upstream_component) {
- SET_TMP_PREFIX("upstream-comp-");
- format_component(buf_ch, false, tmp_prefix,
- port_in_iter->upstream_component);
- }
-
- if (port_in_iter->upstream_port) {
- SET_TMP_PREFIX("upstream-port-");
- format_port(buf_ch, false, tmp_prefix,
- port_in_iter->upstream_port);
- }
-
- if (port_in_iter->connection) {
- SET_TMP_PREFIX("upstream-conn-");
- format_connection(buf_ch, false, tmp_prefix,
- port_in_iter->connection);
- }
- break;
+ if (!extended) {
+ goto end;
}
- case BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT:
- {
- const struct bt_port_output_message_iterator *port_out_iter =
- (const void *) iterator;
- if (port_out_iter->graph) {
- SET_TMP_PREFIX("graph-");
- format_graph(buf_ch, false, tmp_prefix,
- port_out_iter->graph);
- }
-
- if (port_out_iter->colander) {
- SET_TMP_PREFIX("colander-comp-");
- format_component(buf_ch, false, tmp_prefix,
- (void *) port_out_iter->colander);
- }
-
- break;
+ if (port_in_iter->upstream_port) {
+ SET_TMP_PREFIX("upstream-port-");
+ format_port(buf_ch, false, tmp_prefix,
+ port_in_iter->upstream_port);
}
- default:
- break;
+
+ if (port_in_iter->connection) {
+ SET_TMP_PREFIX("upstream-conn-");
+ format_connection(buf_ch, false, tmp_prefix,
+ port_in_iter->connection);
}
+
+end:
+ return;
}
static inline void format_plugin(char **buf_ch, bool extended,
import uuid
import copy
import bt2
-from utils import run_in_component_init
+from utils import run_in_component_init, TestOutputPortMessageIterator
class ClockClassOffsetTestCase(unittest.TestCase):
self._graph = bt2.Graph()
self._src_comp = self._graph.add_component(MySrc, 'my_source')
- self._msg_iter = self._graph.create_output_port_message_iterator(
- self._src_comp.output_ports['out']
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
)
for i, msg in enumerate(self._msg_iter):
from collections import OrderedDict
import unittest
import bt2
+from utils import TestOutputPortMessageIterator
class EventTestCase(unittest.TestCase):
test_obj = self
self._graph = bt2.Graph()
self._src_comp = self._graph.add_component(MySrc, 'my_source')
- self._msg_iter = self._graph.create_output_port_message_iterator(
- self._src_comp.output_ports['out']
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
)
for msg in self._msg_iter:
import collections
import unittest
import bt2
+from utils import TestOutputPortMessageIterator
class AllMessagesTestCase(unittest.TestCase):
def test_all_msg_with_cc(self):
params = {'with_cc': True}
self._src_comp = self._graph.add_component(self._src, 'my_source', params)
- self._msg_iter = self._graph.create_output_port_message_iterator(
- self._src_comp.output_ports['out']
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
)
for i, msg in enumerate(self._msg_iter):
def test_all_msg_without_cc(self):
params = {'with_cc': False}
self._src_comp = self._graph.add_component(self._src, 'my_source', params)
- self._msg_iter = self._graph.create_output_port_message_iterator(
- self._src_comp.output_ports['out']
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
)
for i, msg in enumerate(self._msg_iter):
params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
self._src_comp = self._graph.add_component(self._src, 'my_source', params)
- self._msg_iter = self._graph.create_output_port_message_iterator(
- self._src_comp.output_ports['out']
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
)
msgs = list(self._msg_iter)
import unittest
import copy
import bt2
+from utils import TestOutputPortMessageIterator
class UserMessageIteratorTestCase(unittest.TestCase):
graph = bt2.Graph()
src = graph.add_component(MySource, 'src')
- it = graph.create_output_port_message_iterator(src.output_ports['out'])
+ it = TestOutputPortMessageIterator(graph, src.output_ports['out'])
# Skip beginning messages.
msg = next(it)
self.assertEqual(msg_ev1.addr, msg_ev2.addr)
@staticmethod
- def _setup_seek_beginning_test():
+ def _setup_seek_beginning_test(sink_cls):
# Use a source, a filter and an output port iterator. This allows us
# to test calling `seek_beginning` on both a _OutputPortMessageIterator
# and a _UserComponentInputPortMessageIterator, on top of checking that
graph = bt2.Graph()
src = graph.add_component(MySource, 'src')
flt = graph.add_component(MyFilter, 'flt')
+ sink = graph.add_component(sink_cls, 'sink')
graph.connect_ports(src.output_ports['out'], flt.input_ports['in'])
- it = graph.create_output_port_message_iterator(flt.output_ports['out'])
-
- return it, MySourceIter
+ graph.connect_ports(flt.output_ports['out'], sink.input_ports['in'])
+ return MySourceIter, graph
def test_can_seek_beginning(self):
- it, MySourceIter = self._setup_seek_beginning_test()
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ nonlocal can_seek_beginning
+ can_seek_beginning = self._msg_iter.can_seek_beginning
+
+ MySourceIter, graph = self._setup_seek_beginning_test(MySink)
def _user_can_seek_beginning(self):
- nonlocal can_seek_beginning
- return can_seek_beginning
+ nonlocal input_port_iter_can_seek_beginning
+ return input_port_iter_can_seek_beginning
MySourceIter._user_can_seek_beginning = property(_user_can_seek_beginning)
- can_seek_beginning = True
- self.assertTrue(it.can_seek_beginning)
+ input_port_iter_can_seek_beginning = True
+ can_seek_beginning = None
+ graph.run_once()
+ self.assertTrue(can_seek_beginning)
- can_seek_beginning = False
- self.assertFalse(it.can_seek_beginning)
+ input_port_iter_can_seek_beginning = False
+ can_seek_beginning = None
+ graph.run_once()
+ self.assertFalse(can_seek_beginning)
# Once can_seek_beginning returns an error, verify that it raises when
# _can_seek_beginning has/returns the wrong type.
# a _seek_beginning method to know whether the iterator can seek to
# beginning or not.
del MySourceIter._user_can_seek_beginning
- self.assertTrue(it.can_seek_beginning)
+ can_seek_beginning = None
+ graph.run_once()
+ self.assertTrue(can_seek_beginning)
del MySourceIter._user_seek_beginning
- self.assertFalse(it.can_seek_beginning)
+ can_seek_beginning = None
+ graph.run_once()
+ self.assertFalse(can_seek_beginning)
def test_seek_beginning(self):
- it, MySourceIter = self._setup_seek_beginning_test()
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
- msg = next(it)
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ nonlocal do_seek_beginning
+ nonlocal msg
+
+ if do_seek_beginning:
+ self._msg_iter.seek_beginning()
+ return
+
+ msg = next(self._msg_iter)
+
+ do_seek_beginning = False
+ msg = None
+ MySourceIter, graph = self._setup_seek_beginning_test(MySink)
+ graph.run_once()
self.assertIsInstance(msg, bt2._StreamBeginningMessage)
- msg = next(it)
+ graph.run_once()
self.assertIsInstance(msg, bt2._PacketBeginningMessage)
+ do_seek_beginning = True
+ graph.run_once()
+ do_seek_beginning = False
+ graph.run_once()
+ self.assertIsInstance(msg, bt2._StreamBeginningMessage)
- it.seek_beginning()
+ def test_seek_beginning_user_error(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
- msg = next(it)
- self.assertIsInstance(msg, bt2._StreamBeginningMessage)
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
- # Verify that we can seek beginning after having reached the end.
- #
- # It currently does not work to seek an output port message iterator
- # once it's ended, but we should eventually make it work and uncomment
- # the following snippet.
- #
- # try:
- # while True:
- # next(it)
- # except bt2.Stop:
- # pass
- #
- # it.seek_beginning()
- # msg = next(it)
- # self.assertIsInstance(msg, bt2._StreamBeginningMessage)
+ def _user_consume(self):
+ self._msg_iter.seek_beginning()
- def test_seek_beginning_user_error(self):
- it, MySourceIter = self._setup_seek_beginning_test()
+ MySourceIter, graph = self._setup_seek_beginning_test(MySink)
def _user_seek_beginning_error(self):
raise ValueError('ouch')
MySourceIter._user_seek_beginning = _user_seek_beginning_error
with self.assertRaises(bt2._Error):
- it.seek_beginning()
+ graph.run_once()
# Try consuming many times from an iterator that always returns TryAgain.
# This verifies that we are not missing an incref of Py_None, making the
graph = bt2.Graph()
src = graph.add_component(MySource, 'src')
- it = graph.create_output_port_message_iterator(src.output_ports['out'])
+ it = TestOutputPortMessageIterator(graph, src.output_ports['out'])
# The initial refcount of Py_None was in the 7000, so 100000 iterations
# should be enough to catch the bug even if there are small differences
next(it)
-class OutputPortMessageIteratorTestCase(unittest.TestCase):
- def test_component(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(self, self_port_output):
- self._at = 0
-
- def __next__(self):
- if self._at == 7:
- raise bt2.Stop
-
- if self._at == 0:
- msg = self._create_stream_beginning_message(test_obj._stream)
- elif self._at == 1:
- msg = self._create_packet_beginning_message(test_obj._packet)
- elif self._at == 5:
- msg = self._create_packet_end_message(test_obj._packet)
- elif self._at == 6:
- msg = self._create_stream_end_message(test_obj._stream)
- else:
- msg = self._create_event_message(
- test_obj._event_class, test_obj._packet
- )
- msg.event.payload_field['my_int'] = self._at * 3
-
- self._at += 1
- return msg
-
- class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
- def __init__(self, params, obj):
- self._add_output_port('out')
-
- trace_class = self._create_trace_class()
- stream_class = trace_class.create_stream_class(supports_packets=True)
-
- # Create payload field class
- my_int_ft = trace_class.create_signed_integer_field_class(32)
- payload_ft = trace_class.create_structure_field_class()
- payload_ft += [('my_int', my_int_ft)]
-
- event_class = stream_class.create_event_class(
- name='salut', payload_field_class=payload_ft
- )
-
- trace = trace_class()
- stream = trace.create_stream(stream_class)
- packet = stream.create_packet()
-
- test_obj._event_class = event_class
- test_obj._stream = stream
- test_obj._packet = packet
-
- test_obj = self
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- msg_iter = graph.create_output_port_message_iterator(src.output_ports['out'])
-
- for at, msg in enumerate(msg_iter):
- if at == 0:
- self.assertIsInstance(msg, bt2._StreamBeginningMessage)
- elif at == 1:
- self.assertIsInstance(msg, bt2._PacketBeginningMessage)
- elif at == 5:
- self.assertIsInstance(msg, bt2._PacketEndMessage)
- elif at == 6:
- self.assertIsInstance(msg, bt2._StreamEndMessage)
- else:
- self.assertIsInstance(msg, bt2._EventMessage)
- self.assertEqual(msg.event.cls.name, 'salut')
- field = msg.event.payload_field['my_int']
- self.assertEqual(field, at * 3)
-
-
if __name__ == '__main__':
unittest.main()
#
import bt2
+import collections.abc
# Run callable `func` in the context of a component's __init__ method. The
# callable is passed the Component being instantiated.
#
# The value returned by the callable is returned by run_in_component_init.
-
-
def run_in_component_init(func):
class MySink(bt2._UserSinkComponent):
def __init__(self, params, obj):
# Create an empty trace class with default values.
-
-
def get_default_trace_class():
def f(comp_self):
return comp_self._create_trace_class()
return run_in_component_init(f)
+
+
+# Proxy sink component class.
+#
+# This sink accepts a list of a single item as its initialization
+# object. This sink creates a single input port `in`. When it consumes
+# from this port, it puts the returned message in the initialization
+# list as the first item.
+class TestProxySink(bt2._UserSinkComponent):
+ def __init__(self, params, msg_list):
+ assert msg_list is not None
+ self._msg_list = msg_list
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ assert self._msg_list[0] is None
+ self._msg_list[0] = next(self._msg_iter)
+
+
+# This is a helper message iterator for tests.
+#
+# The constructor accepts a graph and an output port.
+#
+# Internally, it adds a proxy sink to the graph and connects the
+# received output port to the proxy sink's input port. Its __next__()
+# method then uses the proxy sink to transfer the consumed message to
+# the output port message iterator's user.
+#
+# This message iterator cannot seek.
+class TestOutputPortMessageIterator(collections.abc.Iterator):
+ def __init__(self, graph, output_port):
+ self._graph = graph
+ self._msg_list = [None]
+ sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list)
+ graph.connect_ports(output_port, sink.input_ports['in'])
+
+ def __next__(self):
+ assert self._msg_list[0] is None
+ self._graph.run_once()
+ msg = self._msg_list[0]
+ assert msg is not None
+ self._msg_list[0] = None
+ return msg