From 7474e7d3f0d005280c6614be5c818735e65d8b3b Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Wed, 16 Jan 2019 17:19:14 -0500 Subject: [PATCH] lib: add seeking (beginning, ns from origin), with auto-seeking support This patch adds seeking support to the message iterator API. Two seeking operations are supported: Seek beginning: The message iterator goes back to the beginning, that is, to its first message (which is probably a "stream beginning" message). Seek nanoseconds from origin: The message iterator seeks a message which has a default clock snapshot which is at least the requested value when converted to nanoseconds from epoch. No clock class is involved in this operation, only a plain value in nanoseconds from an origin (which can be negative), to support seeking through an iterator chain with multiple clock classes involved. What a message iterator should do exactly is left to the implementation for more flexibility. A source or filter component class can have four new optional message iterator methods: Can seek beginning: Returns whether or not it is possible for this iterator to seek its beginning. If not set, the corresponding API functions (bt_self_component_port_input_message_iterator_can_seek_beginning() and bt_port_output_message_iterator_can_seek_beginning()) return: * `BT_TRUE` if the "seek beginning" method (see below) is set. * `BT_FALSE` otherwise. Can seek nanoseconds from origin: Returns whether or not it is possible for this iterator to seek a given nanosecond from origin. If not set, the corresponding API functions (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin() and bt_port_output_message_iterator_can_seek_ns_from_origin()) return: * `BT_TRUE` if the "seek nanoseconds from origin" method (see below) is set, or if bt_self_component_port_input_message_iterator_can_seek_beginning() returns `BT_TRUE` (auto-seeking, see below). * `BT_FALSE` otherwise. Seek beginning: Seeks this iterator to its beginning. The corresponding API functions are bt_self_component_port_input_message_iterator_seek_beginning() and bt_port_output_message_iterator_seek_beginning(). Seek nanoseconds from origin: Seeks this iterator to a specific nanosecond from origin. The corresponding API functions are bt_self_component_port_input_message_iterator_seek_ns_from_origin() and bt_port_output_message_iterator_seek_ns_from_origin(). If not set, then if bt_self_component_port_input_message_iterator_can_seek_beginning() returns `BT_TRUE` (which means it is possible for this iterator to seek its beginning) for this iterator, then auto-seeking is enabled for this iterator. Auto-seeking is a feature which makes the library use only the "seek beginning" method of an iterator to implement a "seek nanoseconds from origin" method which calls the iterator's "next" method, skipping messages until it finds one which matches the requested value. This exists because most of the time it is easier for a plugin developer to reset an iterator's state to the beginning than to seek a message having a specific clock snapshot value. With the conditions described above: * If you implement the "seek beginning" method, it is the equivalent of also implementing a "can seek beginning" method which always returns `BT_TRUE`. * If you implement the "seek nanoseconds from origin" method, it is the equivalent of also implementing a "can seek nanoseconds from origin" method which always returns `BT_TRUE`. * If you only implement the "seek beginning" method, than the iterator's user can always seek beginning and any nanosecond from origin. * If you implement none of the above methods, then seeking is completely disabled. The methods, as well as the corresponding API functions, can return the following message iterator statuses: `BT_SELF_MESSAGE_ITERATOR_STATUS_OK`: The seeking operation succeeded. This is the only status which allows a subsequent call to the iterator's "next" method. `BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN`: The seeking operation did not terminate, but this is not an error: the iterator's user should seek again later (the same time or a different time, for example you can seek beginning after getting this status from seeking a specific nanosecond from origin). `BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR`: `BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM`: The seeking operation failed: the iterator's user must seek again (with success) in order to make the iterator active again (able to advance). The seeking methods cannot return `BT_SELF_MESSAGE_ITERATOR_STATUS_END`: if seeking reaches the end, then the seeking method must return `BT_SELF_MESSAGE_ITERATOR_STATUS_OK` and the following call to its "next" method must return `BT_SELF_MESSAGE_ITERATOR_STATUS_END`. After a successful seeking operation, the first message of the batch that an iterator's "next" method returns can be of ANY type. Therefore, an iterator's user must have its state ready when seeking because the usual guarantees do not need to be satisfied (for example, getting a "packet beginning" message before any event message). Because seeking breaks the concept of having contiguous messages with no interruption, the concept of a message's sequence number for developer mode validation is removed. We need to find another way to make this validation in the future. Signed-off-by: Philippe Proulx --- include/Makefile.am | 1 + include/babeltrace/babeltrace.h | 1 + .../babeltrace/graph/component-class-filter.h | 41 +- .../graph/component-class-internal.h | 8 + .../component-class-sink-colander-internal.h | 6 + .../babeltrace/graph/component-class-source.h | 40 +- .../babeltrace/graph/connection-internal.h | 2 +- include/babeltrace/graph/message-internal.h | 2 - ...ge-iterator.h => message-iterator-const.h} | 9 +- .../graph/message-iterator-internal.h | 70 +- .../graph/port-output-message-iterator.h | 20 +- ...lf-component-port-input-message-iterator.h | 21 +- .../babeltrace/graph/self-message-iterator.h | 2 +- include/babeltrace/plugin/plugin-dev.h | 216 +++- lib/graph/component-class-sink-colander.c | 17 +- lib/graph/component-class.c | 172 ++- lib/graph/component-source.c | 2 +- lib/graph/iterator.c | 975 ++++++++++++------ lib/graph/message/message.c | 16 +- lib/plugin/plugin-so.c | 160 +++ 20 files changed, 1368 insertions(+), 413 deletions(-) rename include/babeltrace/graph/{message-iterator.h => message-iterator-const.h} (88%) diff --git a/include/Makefile.am b/include/Makefile.am index e00822b6..c3393e17 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -172,6 +172,7 @@ babeltracegraphinclude_HEADERS = \ babeltrace/graph/message-event.h \ babeltrace/graph/message-inactivity-const.h \ babeltrace/graph/message-inactivity.h \ + babeltrace/graph/message-iterator-const.h \ babeltrace/graph/message-packet-const.h \ babeltrace/graph/message-packet.h \ babeltrace/graph/message-stream-const.h \ diff --git a/include/babeltrace/babeltrace.h b/include/babeltrace/babeltrace.h index d3485893..36d42258 100644 --- a/include/babeltrace/babeltrace.h +++ b/include/babeltrace/babeltrace.h @@ -114,6 +114,7 @@ #include #include #include +#include #include #include #include diff --git a/include/babeltrace/graph/component-class-filter.h b/include/babeltrace/graph/component-class-filter.h index 890cfdfa..cf001bf8 100644 --- a/include/babeltrace/graph/component-class-filter.h +++ b/include/babeltrace/graph/component-class-filter.h @@ -42,7 +42,8 @@ * For bt_component_class, bt_component_class_filter, bt_port_input, * bt_port_output, bt_query_executor, bt_self_component_class_filter, * bt_self_component_filter, bt_self_component_port_input, - * bt_self_component_port_output, bt_value, bt_message_array_const + * bt_self_component_port_output, bt_value, bt_message_array_const, + * bt_bool, bt_self_message_iterator */ #include @@ -74,6 +75,24 @@ typedef bt_self_message_iterator_status bt_message_array_const msgs, uint64_t capacity, uint64_t *count); +typedef bt_self_message_iterator_status +(*bt_component_class_filter_message_iterator_seek_ns_from_origin_method)( + bt_self_message_iterator *message_iterator, + int64_t ns_from_origin); + +typedef bt_self_message_iterator_status +(*bt_component_class_filter_message_iterator_seek_beginning_method)( + bt_self_message_iterator *message_iterator); + +typedef bt_bool +(*bt_component_class_filter_message_iterator_can_seek_ns_from_origin_method)( + bt_self_message_iterator *message_iterator, + int64_t ns_from_origin); + +typedef bt_bool +(*bt_component_class_filter_message_iterator_can_seek_beginning_method)( + bt_self_message_iterator *message_iterator); + typedef bt_query_status (*bt_component_class_filter_query_method)( bt_self_component_class_filter *comp_class, @@ -162,6 +181,26 @@ bt_component_class_filter_set_message_iterator_finalize_method( bt_component_class_filter *comp_class, bt_component_class_filter_message_iterator_finalize_method method); +extern bt_component_class_status +bt_component_class_filter_set_message_iterator_seek_ns_from_origin_method( + bt_component_class_filter *comp_class, + bt_component_class_filter_message_iterator_seek_ns_from_origin_method method); + +extern bt_component_class_status +bt_component_class_filter_set_message_iterator_seek_beginning_method( + bt_component_class_filter *comp_class, + bt_component_class_filter_message_iterator_seek_beginning_method method); + +extern bt_bool +bt_component_class_filter_set_message_iterator_can_seek_ns_from_origin_method( + bt_component_class_filter *comp_class, + bt_component_class_filter_message_iterator_can_seek_ns_from_origin_method method); + +extern bt_bool +bt_component_class_filter_set_message_iterator_can_seek_beginning_method( + bt_component_class_filter *comp_class, + bt_component_class_filter_message_iterator_can_seek_beginning_method method); + #ifdef __cplusplus } #endif diff --git a/include/babeltrace/graph/component-class-internal.h b/include/babeltrace/graph/component-class-internal.h index 33813334..953fa91a 100644 --- a/include/babeltrace/graph/component-class-internal.h +++ b/include/babeltrace/graph/component-class-internal.h @@ -70,6 +70,10 @@ struct bt_component_class_source { bt_component_class_source_message_iterator_init_method msg_iter_init; bt_component_class_source_message_iterator_finalize_method msg_iter_finalize; bt_component_class_source_message_iterator_next_method msg_iter_next; + bt_component_class_source_message_iterator_seek_ns_from_origin_method msg_iter_seek_ns_from_origin; + bt_component_class_source_message_iterator_seek_beginning_method msg_iter_seek_beginning; + bt_component_class_source_message_iterator_can_seek_ns_from_origin_method msg_iter_can_seek_ns_from_origin; + bt_component_class_source_message_iterator_can_seek_beginning_method msg_iter_can_seek_beginning; bt_component_class_source_query_method query; bt_component_class_source_accept_output_port_connection_method accept_output_port_connection; bt_component_class_source_output_port_connected_method output_port_connected; @@ -96,6 +100,10 @@ struct bt_component_class_filter { bt_component_class_filter_message_iterator_init_method msg_iter_init; bt_component_class_filter_message_iterator_finalize_method msg_iter_finalize; bt_component_class_filter_message_iterator_next_method msg_iter_next; + bt_component_class_filter_message_iterator_seek_ns_from_origin_method msg_iter_seek_ns_from_origin; + bt_component_class_filter_message_iterator_seek_beginning_method msg_iter_seek_beginning; + bt_component_class_filter_message_iterator_can_seek_ns_from_origin_method msg_iter_can_seek_ns_from_origin; + bt_component_class_filter_message_iterator_can_seek_beginning_method msg_iter_can_seek_beginning; bt_component_class_filter_query_method query; bt_component_class_filter_accept_input_port_connection_method accept_input_port_connection; bt_component_class_filter_accept_output_port_connection_method accept_output_port_connection; diff --git a/include/babeltrace/graph/component-class-sink-colander-internal.h b/include/babeltrace/graph/component-class-sink-colander-internal.h index a56f0b4e..083f9762 100644 --- a/include/babeltrace/graph/component-class-sink-colander-internal.h +++ b/include/babeltrace/graph/component-class-sink-colander-internal.h @@ -31,6 +31,12 @@ extern "C" { #endif +struct bt_component_class_sink_colander_priv_data { + bt_message_array_const msgs; + uint64_t *count_addr; + struct bt_self_component_port_input_message_iterator *msg_iter; +}; + struct bt_component_class_sink_colander_data { bt_message_array_const msgs; uint64_t *count_addr; diff --git a/include/babeltrace/graph/component-class-source.h b/include/babeltrace/graph/component-class-source.h index 845a341e..8697faaa 100644 --- a/include/babeltrace/graph/component-class-source.h +++ b/include/babeltrace/graph/component-class-source.h @@ -42,7 +42,7 @@ * For bt_component_class, bt_component_class_source, bt_port_input, * bt_query_executor, bt_self_component_class_source, * bt_self_component_source, bt_self_component_port_output, bt_value, - * bt_message_array_const + * bt_message_array_const, bt_bool, bt_self_message_iterator */ #include @@ -74,6 +74,24 @@ typedef bt_self_message_iterator_status bt_message_array_const msgs, uint64_t capacity, uint64_t *count); +typedef bt_self_message_iterator_status +(*bt_component_class_source_message_iterator_seek_ns_from_origin_method)( + bt_self_message_iterator *message_iterator, + int64_t ns_from_origin); + +typedef bt_self_message_iterator_status +(*bt_component_class_source_message_iterator_seek_beginning_method)( + bt_self_message_iterator *message_iterator); + +typedef bt_bool +(*bt_component_class_source_message_iterator_can_seek_ns_from_origin_method)( + bt_self_message_iterator *message_iterator, + int64_t ns_from_origin); + +typedef bt_bool +(*bt_component_class_source_message_iterator_can_seek_beginning_method)( + bt_self_message_iterator *message_iterator); + typedef bt_query_status (*bt_component_class_source_query_method)( bt_self_component_class_source *comp_class, const bt_query_executor *query_executor, @@ -139,6 +157,26 @@ bt_component_class_source_set_message_iterator_finalize_method( bt_component_class_source *comp_class, bt_component_class_source_message_iterator_finalize_method method); +extern bt_component_class_status +bt_component_class_source_set_message_iterator_seek_ns_from_origin_method( + bt_component_class_source *comp_class, + bt_component_class_source_message_iterator_seek_ns_from_origin_method method); + +extern bt_component_class_status +bt_component_class_source_set_message_iterator_seek_beginning_method( + bt_component_class_source *comp_class, + bt_component_class_source_message_iterator_seek_beginning_method method); + +extern bt_bool +bt_component_class_source_set_message_iterator_can_seek_ns_from_origin_method( + bt_component_class_source *comp_class, + bt_component_class_source_message_iterator_can_seek_ns_from_origin_method method); + +extern bt_bool +bt_component_class_source_set_message_iterator_can_seek_beginning_method( + bt_component_class_source *comp_class, + bt_component_class_source_message_iterator_can_seek_beginning_method method); + #ifdef __cplusplus } #endif diff --git a/include/babeltrace/graph/connection-internal.h b/include/babeltrace/graph/connection-internal.h index 1500b111..2c9b9d0d 100644 --- a/include/babeltrace/graph/connection-internal.h +++ b/include/babeltrace/graph/connection-internal.h @@ -25,7 +25,7 @@ */ #include -#include +#include #include #include #include diff --git a/include/babeltrace/graph/message-internal.h b/include/babeltrace/graph/message-internal.h index 8f6158ea..96740024 100644 --- a/include/babeltrace/graph/message-internal.h +++ b/include/babeltrace/graph/message-internal.h @@ -39,7 +39,6 @@ typedef struct bt_stream *(*get_stream_func)( struct bt_message { struct bt_object base; enum bt_message_type type; - uint64_t seq_num; bt_bool frozen; /* Owned by this; keeps the graph alive while the msg. is alive */ @@ -65,7 +64,6 @@ void bt_message_reset(struct bt_message *message) #ifdef BT_DEV_MODE message->frozen = BT_FALSE; - message->seq_num = UINT64_C(-1); #endif } diff --git a/include/babeltrace/graph/message-iterator.h b/include/babeltrace/graph/message-iterator-const.h similarity index 88% rename from include/babeltrace/graph/message-iterator.h rename to include/babeltrace/graph/message-iterator-const.h index acb93f8f..43a6ab19 100644 --- a/include/babeltrace/graph/message-iterator.h +++ b/include/babeltrace/graph/message-iterator-const.h @@ -1,5 +1,5 @@ -#ifndef BABELTRACE_GRAPH_MESSAGE_ITERATOR_H -#define BABELTRACE_GRAPH_MESSAGE_ITERATOR_H +#ifndef BABELTRACE_GRAPH_MESSAGE_ITERATOR_CONST_H +#define BABELTRACE_GRAPH_MESSAGE_ITERATOR_CONST_H /* * Copyright 2017-2018 Philippe Proulx @@ -24,9 +24,6 @@ * SOFTWARE. */ -/* For bt_message, bt_message_iterator */ -#include - #ifdef __cplusplus extern "C" { #endif @@ -43,4 +40,4 @@ typedef enum bt_message_iterator_status { } #endif -#endif /* BABELTRACE_GRAPH_MESSAGE_ITERATOR_H */ +#endif /* BABELTRACE_GRAPH_MESSAGE_ITERATOR_CONST_H */ diff --git a/include/babeltrace/graph/message-iterator-internal.h b/include/babeltrace/graph/message-iterator-internal.h index 1b1d9e15..3a57f5da 100644 --- a/include/babeltrace/graph/message-iterator-internal.h +++ b/include/babeltrace/graph/message-iterator-internal.h @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include #include #include @@ -42,10 +42,10 @@ enum bt_message_iterator_type { }; enum bt_self_component_port_input_message_iterator_state { - /* Iterator is not initialized. */ + /* Iterator is not initialized */ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED, - /* Iterator is active, not at the end yet, and not finalized. */ + /* Iterator is active, not at the end yet, and not finalized */ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE, /* @@ -54,15 +54,20 @@ enum bt_self_component_port_input_message_iterator_state { */ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED, - /* - * Iterator is currently being finalized. - */ + /* Iterator is currently being finalized */ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING, - /* - * Iterator is finalized. - */ + /* Iterator is finalized */ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED, + + /* Iterator is seeking */ + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING, + + /* Iterator did seek, but returned `BT_MESSAGE_ITERATOR_STATUS_AGAIN` */ + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN, + + /* Iterator did seek, but returned error status */ + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR, }; struct bt_message_iterator { @@ -71,6 +76,26 @@ struct bt_message_iterator { GPtrArray *msgs; }; +typedef enum bt_self_message_iterator_status +(*bt_self_component_port_input_message_iterator_next_method)( + void *, bt_message_array_const, uint64_t, uint64_t *); + +typedef enum bt_self_message_iterator_status +(*bt_self_component_port_input_message_iterator_seek_ns_from_origin_method)( + void *, int64_t); + +typedef enum bt_self_message_iterator_status +(*bt_self_component_port_input_message_iterator_seek_beginning_method)( + void *); + +typedef bt_bool +(*bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method)( + void *, int64_t); + +typedef bt_bool +(*bt_self_component_port_input_message_iterator_can_seek_beginning_method)( + void *); + struct bt_self_component_port_input_message_iterator { struct bt_message_iterator base; struct bt_component *upstream_component; /* Weak */ @@ -78,20 +103,17 @@ struct bt_self_component_port_input_message_iterator { struct bt_connection *connection; /* Weak */ struct bt_graph *graph; /* Weak */ - /* - * This hash table keeps the state of a stream as viewed by this - * message iterator. This is used, in developer mode, to make - * sure that, once the message iterator has seen a "stream end" - * message for a given stream, no other messages which refer to - * this stream can be delivered by this iterator. It is also - * used to check for a valid sequence of messages. - * - * The key (struct bt_stream *) is not owned by this. The value - * is an allocated state structure. - */ - GHashTable *stream_states; + struct { + bt_self_component_port_input_message_iterator_next_method next; + bt_self_component_port_input_message_iterator_seek_ns_from_origin_method seek_ns_from_origin; + bt_self_component_port_input_message_iterator_seek_beginning_method seek_beginning; + bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method can_seek_ns_from_origin; + bt_self_component_port_input_message_iterator_can_seek_beginning_method can_seek_beginning; + } methods; enum bt_self_component_port_input_message_iterator_state state; + uint64_t auto_seek_msg_count; + GPtrArray *auto_seek_msgs; void *user_data; }; @@ -149,6 +171,12 @@ const char *bt_self_component_port_input_message_iterator_state_string( return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING"; case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED: return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED"; + case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING: + return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING"; + case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN: + return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN"; + case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR: + return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR"; default: return "(unknown)"; } diff --git a/include/babeltrace/graph/port-output-message-iterator.h b/include/babeltrace/graph/port-output-message-iterator.h index 08cc225b..fa1ab274 100644 --- a/include/babeltrace/graph/port-output-message-iterator.h +++ b/include/babeltrace/graph/port-output-message-iterator.h @@ -26,12 +26,12 @@ #include /* For bt_message_iterator_status */ -#include +#include /* * For bt_port, bt_message, bt_message_iterator, * bt_port_output_message_iterator, bt_graph, bt_port_output, - * bt_message_array_const + * bt_message_array_const, bt_bool */ #include @@ -57,6 +57,22 @@ bt_port_output_message_iterator_next( bt_port_output_message_iterator *iterator, bt_message_array_const *msgs, uint64_t *count); +extern bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin( + bt_port_output_message_iterator *iterator, + int64_t ns_from_origin); + +extern bt_bool bt_port_output_message_iterator_can_seek_beginning( + bt_port_output_message_iterator *iterator); + +extern bt_message_iterator_status +bt_port_output_message_iterator_seek_ns_from_origin( + bt_port_output_message_iterator *iterator, + int64_t ns_from_origin); + +extern bt_message_iterator_status +bt_port_output_message_iterator_seek_beginning( + bt_port_output_message_iterator *iterator); + extern void bt_port_output_message_iterator_get_ref( const bt_port_output_message_iterator *port_output_message_iterator); diff --git a/include/babeltrace/graph/self-component-port-input-message-iterator.h b/include/babeltrace/graph/self-component-port-input-message-iterator.h index 480c1430..61fd2b21 100644 --- a/include/babeltrace/graph/self-component-port-input-message-iterator.h +++ b/include/babeltrace/graph/self-component-port-input-message-iterator.h @@ -26,12 +26,12 @@ #include /* For bt_message_iterator_status */ -#include +#include /* * For bt_component, bt_message_iterator, * bt_self_component_port_input_message_iterator, - * bt_self_component_port_input, bt_message_array_const + * bt_self_component_port_input, bt_message_array_const, bt_bool */ #include @@ -60,6 +60,23 @@ bt_self_component_port_input_message_iterator_next( bt_self_component_port_input_message_iterator *iterator, bt_message_array_const *msgs, uint64_t *count); +extern bt_bool +bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( + bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin); + +extern bt_bool bt_self_component_port_input_message_iterator_can_seek_beginning( + bt_self_component_port_input_message_iterator *iterator); + +extern bt_message_iterator_status +bt_self_component_port_input_message_iterator_seek_ns_from_origin( + bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin); + +extern bt_message_iterator_status +bt_self_component_port_input_message_iterator_seek_beginning( + bt_self_component_port_input_message_iterator *iterator); + extern void bt_self_component_port_input_message_iterator_get_ref( const bt_self_component_port_input_message_iterator *self_component_port_input_message_iterator); diff --git a/include/babeltrace/graph/self-message-iterator.h b/include/babeltrace/graph/self-message-iterator.h index a66234bd..555a7538 100644 --- a/include/babeltrace/graph/self-message-iterator.h +++ b/include/babeltrace/graph/self-message-iterator.h @@ -24,7 +24,7 @@ */ /* For BT_MESSAGE_ITERATOR_STATUS_* */ -#include +#include /* For bt_self_component, bt_self_message_iterator, bt_self_port_output */ #include diff --git a/include/babeltrace/plugin/plugin-dev.h b/include/babeltrace/plugin/plugin-dev.h index 09986c45..1be7aa93 100644 --- a/include/babeltrace/plugin/plugin-dev.h +++ b/include/babeltrace/plugin/plugin-dev.h @@ -184,8 +184,12 @@ enum __bt_plugin_component_class_descriptor_attribute_type { BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_ACCEPT_OUTPUT_PORT_CONNECTION_METHOD = 6, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_INPUT_PORT_CONNECTED_METHOD = 7, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_OUTPUT_PORT_CONNECTED_METHOD = 8, - BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_INIT_METHOD = 11, - BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_FINALIZE_METHOD = 12, + BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_INIT_METHOD = 9, + BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_FINALIZE_METHOD = 10, + BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_NS_FROM_ORIGIN_METHOD = 11, + BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_BEGINNING_METHOD = 12, + BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_NS_FROM_ORIGIN_METHOD = 13, + BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_BEGINNING_METHOD = 14, }; /* Component class attribute (internal use) */ @@ -248,6 +252,22 @@ struct __bt_plugin_component_class_descriptor_attribute { /* BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_FINALIZE_METHOD */ bt_component_class_source_message_iterator_finalize_method source_msg_iter_finalize_method; bt_component_class_filter_message_iterator_finalize_method filter_msg_iter_finalize_method; + + /* BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_NS_FROM_ORIGIN_METHOD */ + bt_component_class_source_message_iterator_seek_ns_from_origin_method source_msg_iter_seek_ns_from_origin_method; + bt_component_class_filter_message_iterator_seek_ns_from_origin_method filter_msg_iter_seek_ns_from_origin_method; + + /* BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_BEGINNING_METHOD */ + bt_component_class_source_message_iterator_seek_beginning_method source_msg_iter_seek_beginning_method; + bt_component_class_filter_message_iterator_seek_beginning_method filter_msg_iter_seek_beginning_method; + + /* BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_NS_FROM_ORIGIN_METHOD */ + bt_component_class_source_message_iterator_can_seek_ns_from_origin_method source_msg_iter_can_seek_ns_from_origin_method; + bt_component_class_filter_message_iterator_can_seek_ns_from_origin_method filter_msg_iter_can_seek_ns_from_origin_method; + + /* BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_BEGINNING_METHOD */ + bt_component_class_source_message_iterator_can_seek_beginning_method source_msg_iter_can_seek_beginning_method; + bt_component_class_filter_message_iterator_can_seek_beginning_method filter_msg_iter_can_seek_beginning_method; } value; } __attribute__((packed)); @@ -872,6 +892,54 @@ struct __bt_plugin_component_class_descriptor_attribute const * const *__bt_get_ #define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_FINALIZE_METHOD_WITH_ID(_id, _comp_class_id, _x) \ __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(source_msg_iter_finalize_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_FINALIZE_METHOD, _id, _comp_class_id, source, _x) +/* + * Defines an iterator "seek nanoseconds from origin" method attribute + * attached to a specific source component class descriptor. + * + * _id: Plugin descriptor ID (C identifier). + * _comp_class_id: Component class descriptor ID (C identifier). + * _x: Iterator "seek nanoseconds from origin" method + * (bt_component_class_source_message_iterator_seek_ns_from_origin_method). + */ +#define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_NS_FROM_ORIGIN_METHOD_WITH_ID(_id, _comp_class_id, _x) \ + __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(source_msg_iter_seek_ns_from_origin_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_NS_FROM_ORIGIN_METHOD, _id, _comp_class_id, source, _x) + +/* + * Defines an iterator "seek beginning" method attribute attached to a + * specific source component class descriptor. + * + * _id: Plugin descriptor ID (C identifier). + * _comp_class_id: Component class descriptor ID (C identifier). + * _x: Iterator "seek beginning" method + * (bt_component_class_source_message_iterator_seek_beginning_method). + */ +#define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_WITH_ID(_id, _comp_class_id, _x) \ + __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(source_msg_iter_seek_beginning_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_BEGINNING_METHOD, _id, _comp_class_id, source, _x) + +/* + * Defines an iterator "can seek nanoseconds from origin" method + * attribute attached to a specific source component class descriptor. + * + * _id: Plugin descriptor ID (C identifier). + * _comp_class_id: Component class descriptor ID (C identifier). + * _x: Iterator "can seek nanoseconds from origin" method + * (bt_component_class_source_message_iterator_can_seek_ns_from_origin_method). + */ +#define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_NS_FROM_ORIGIN_METHOD_WITH_ID(_id, _comp_class_id, _x) \ + __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(source_msg_iter_can_seek_ns_from_origin_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_NS_FROM_ORIGIN_METHOD, _id, _comp_class_id, source, _x) + +/* + * Defines an iterator "can seek beginning" method attribute attached to a + * specific source component class descriptor. + * + * _id: Plugin descriptor ID (C identifier). + * _comp_class_id: Component class descriptor ID (C identifier). + * _x: Iterator "can seek beginning" method + * (bt_component_class_source_message_iterator_can_seek_beginning_method). + */ +#define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_WITH_ID(_id, _comp_class_id, _x) \ + __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(source_msg_iter_can_seek_beginning_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_BEGINNING_METHOD, _id, _comp_class_id, source, _x) + /* * Defines an iterator initialization method attribute attached to a * specific filter component class descriptor. @@ -896,6 +964,54 @@ struct __bt_plugin_component_class_descriptor_attribute const * const *__bt_get_ #define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_FINALIZE_METHOD_WITH_ID(_id, _comp_class_id, _x) \ __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(filter_msg_iter_finalize_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_FINALIZE_METHOD, _id, _comp_class_id, filter, _x) +/* + * Defines an iterator "seek nanoseconds from origin" method attribute + * attached to a specific filter component class descriptor. + * + * _id: Plugin descriptor ID (C identifier). + * _comp_class_id: Component class descriptor ID (C identifier). + * _x: Iterator "seek nanoseconds from origin" method + * (bt_component_class_filter_message_iterator_seek_ns_from_origin_method). + */ +#define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_NS_FROM_ORIGIN_METHOD_WITH_ID(_id, _comp_class_id, _x) \ + __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(filter_msg_iter_seek_ns_from_origin_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_NS_FROM_ORIGIN_METHOD, _id, _comp_class_id, filter, _x) + +/* + * Defines an iterator "seek beginning" method attribute attached to a + * specific filter component class descriptor. + * + * _id: Plugin descriptor ID (C identifier). + * _comp_class_id: Component class descriptor ID (C identifier). + * _x: Iterator "seek beginning" method + * (bt_component_class_filter_message_iterator_seek_beginning_method). + */ +#define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_WITH_ID(_id, _comp_class_id, _x) \ + __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(filter_msg_iter_seek_beginning_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_BEGINNING_METHOD, _id, _comp_class_id, filter, _x) + +/* + * Defines an iterator "can seek nanoseconds from origin" method + * attribute attached to a specific filter component class descriptor. + * + * _id: Plugin descriptor ID (C identifier). + * _comp_class_id: Component class descriptor ID (C identifier). + * _x: Iterator "can seek nanoseconds from origin" method + * (bt_component_class_filter_message_iterator_can_seek_ns_from_origin_method). + */ +#define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_NS_FROM_ORIGIN_METHOD_WITH_ID(_id, _comp_class_id, _x) \ + __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(filter_msg_iter_can_seek_ns_from_origin_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_NS_FROM_ORIGIN_METHOD, _id, _comp_class_id, filter, _x) + +/* + * Defines an iterator "can seek beginning" method attribute attached to a + * specific filter component class descriptor. + * + * _id: Plugin descriptor ID (C identifier). + * _comp_class_id: Component class descriptor ID (C identifier). + * _x: Iterator "can seek beginning" method + * (bt_component_class_filter_message_iterator_can_seek_beginning_method). + */ +#define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_WITH_ID(_id, _comp_class_id, _x) \ + __BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE(filter_msg_iter_can_seek_beginning_method, BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_BEGINNING_METHOD, _id, _comp_class_id, filter, _x) + /* * Defines a plugin descriptor with an automatic ID. * @@ -1264,6 +1380,54 @@ struct __bt_plugin_component_class_descriptor_attribute const * const *__bt_get_ #define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_FINALIZE_METHOD(_name, _x) \ BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_FINALIZE_METHOD_WITH_ID(auto, _name, _x) +/* + * Defines an iterator "seek nanoseconds from origin" method attribute + * attached to a source component class descriptor which is attached to + * the automatic plugin descriptor. + * + * _name: Component class name (C identifier). + * _x: Iterator "seek nanoseconds from origin" method + * (bt_component_class_source_message_iterator_seek_ns_from_origin_method). + */ +#define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_NS_FROM_ORIGIN_METHOD(_name, _x) \ + BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_NS_FROM_ORIGIN_METHOD_WITH_ID(auto, _name, _x) + +/* + * Defines an iterator "seek beginning" method attribute + * attached to a source component class descriptor which is attached to + * the automatic plugin descriptor. + * + * _name: Component class name (C identifier). + * _x: Iterator "seek beginning" method + * (bt_component_class_source_message_iterator_seek_beginning_method). + */ +#define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD(_name, _x) \ + BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_WITH_ID(auto, _name, _x) + +/* + * Defines an iterator "can seek nanoseconds from origin" method + * attribute attached to a source component class descriptor which is + * attached to the automatic plugin descriptor. + * + * _name: Component class name (C identifier). + * _x: Iterator "can seek nanoseconds from origin" method + * (bt_component_class_source_message_iterator_can_seek_ns_from_origin_method). + */ +#define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_NS_FROM_ORIGIN_METHOD(_name, _x) \ + BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_NS_FROM_ORIGIN_METHOD_WITH_ID(auto, _name, _x) + +/* + * Defines an iterator "can seek beginning" method attribute + * attached to a source component class descriptor which is attached to + * the automatic plugin descriptor. + * + * _name: Component class name (C identifier). + * _x: Iterator "can seek beginning" method + * (bt_component_class_source_message_iterator_can_seek_beginning_method). + */ +#define BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD(_name, _x) \ + BT_PLUGIN_SOURCE_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_WITH_ID(auto, _name, _x) + /* * Defines an iterator initialization method attribute attached to a * filter component class descriptor which is attached to the automatic @@ -1288,6 +1452,54 @@ struct __bt_plugin_component_class_descriptor_attribute const * const *__bt_get_ #define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_FINALIZE_METHOD(_name, _x) \ BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_FINALIZE_METHOD_WITH_ID(auto, _name, _x) +/* + * Defines an iterator "seek nanoseconds from origin" method attribute + * attached to a filter component class descriptor which is attached to + * the automatic plugin descriptor. + * + * _name: Component class name (C identifier). + * _x: Iterator "seek nanoseconds from origin" method + * (bt_component_class_filter_message_iterator_seek_ns_from_origin_method). + */ +#define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_NS_FROM_ORIGIN_METHOD(_name, _x) \ + BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_NS_FROM_ORIGIN_METHOD_WITH_ID(auto, _name, _x) + +/* + * Defines an iterator "seek beginning" method attribute + * attached to a filter component class descriptor which is attached to + * the automatic plugin descriptor. + * + * _name: Component class name (C identifier). + * _x: Iterator "seek beginning" method + * (bt_component_class_filter_message_iterator_seek_beginning_method). + */ +#define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD(_name, _x) \ + BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_SEEK_BEGINNING_METHOD_WITH_ID(auto, _name, _x) + +/* + * Defines an iterator "can seek nanoseconds from origin" method + * attribute attached to a filter component class descriptor which is + * attached to the automatic plugin descriptor. + * + * _name: Component class name (C identifier). + * _x: Iterator "can seek nanoseconds from origin" method + * (bt_component_class_filter_message_iterator_can_seek_ns_from_origin_method). + */ +#define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_NS_FROM_ORIGIN_METHOD(_name, _x) \ + BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_NS_FROM_ORIGIN_METHOD_WITH_ID(auto, _name, _x) + +/* + * Defines an iterator "can seek beginning" method attribute + * attached to a filter component class descriptor which is attached to + * the automatic plugin descriptor. + * + * _name: Component class name (C identifier). + * _x: Iterator "can seek beginning" method + * (bt_component_class_filter_message_iterator_can_seek_beginning_method). + */ +#define BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD(_name, _x) \ + BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_WITH_ID(auto, _name, _x) + #define BT_PLUGIN_MODULE() \ static struct __bt_plugin_descriptor const * const __bt_plugin_descriptor_dummy __BT_PLUGIN_DESCRIPTOR_ATTRS = NULL; \ _BT_HIDDEN extern struct __bt_plugin_descriptor const *__BT_PLUGIN_DESCRIPTOR_BEGIN_SYMBOL __BT_PLUGIN_DESCRIPTOR_BEGIN_EXTRA; \ diff --git a/lib/graph/component-class-sink-colander.c b/lib/graph/component-class-sink-colander.c index 1b67b2f8..9a70f105 100644 --- a/lib/graph/component-class-sink-colander.c +++ b/lib/graph/component-class-sink-colander.c @@ -37,19 +37,13 @@ static struct bt_component_class_sink *colander_comp_cls; -struct colander_data { - bt_message_array_const msgs; - uint64_t *count_addr; - struct bt_self_component_port_input_message_iterator *msg_iter; -}; - static enum bt_self_component_status colander_init( struct bt_self_component_sink *self_comp, const struct bt_value *params, void *init_method_data) { enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; - struct colander_data *colander_data = NULL; + struct bt_component_class_sink_colander_priv_data *colander_data = NULL; struct bt_component_class_sink_colander_data *user_provided_data = init_method_data; @@ -59,7 +53,8 @@ enum bt_self_component_status colander_init( goto end; } - colander_data = g_new0(struct colander_data, 1); + colander_data = g_new0( + struct bt_component_class_sink_colander_priv_data, 1); if (!colander_data) { BT_LOGE_STR("Failed to allocate colander data."); status = BT_SELF_COMPONENT_STATUS_NOMEM; @@ -86,7 +81,7 @@ end: static void colander_finalize(struct bt_self_component_sink *self_comp) { - struct colander_data *colander_data = + struct bt_component_class_sink_colander_priv_data *colander_data = bt_self_component_get_data( bt_self_component_sink_as_self_component(self_comp)); @@ -105,7 +100,7 @@ enum bt_self_component_status colander_input_port_connected( const struct bt_port_output *other_port) { enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; - struct colander_data *colander_data = + struct bt_component_class_sink_colander_priv_data *colander_data = bt_self_component_get_data( bt_self_component_sink_as_self_component(self_comp)); @@ -132,7 +127,7 @@ enum bt_self_component_status colander_consume( { enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; enum bt_message_iterator_status msg_iter_status; - struct colander_data *colander_data = + struct bt_component_class_sink_colander_priv_data *colander_data = bt_self_component_get_data( bt_self_component_sink_as_self_component(self_comp)); bt_message_array_const msgs; diff --git a/lib/graph/component-class.c b/lib/graph/component-class.c index c3fd7c8a..7f90720e 100644 --- a/lib/graph/component-class.c +++ b/lib/graph/component-class.c @@ -245,7 +245,8 @@ end: return (void *) sink_class; } -int bt_component_class_source_set_init_method( +extern enum bt_component_class_status +bt_component_class_source_set_init_method( struct bt_component_class_source *comp_cls, bt_component_class_source_init_method method) { @@ -258,7 +259,8 @@ int bt_component_class_source_set_init_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_init_method( +extern enum bt_component_class_status +bt_component_class_filter_set_init_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_init_method method) { @@ -271,7 +273,8 @@ int bt_component_class_filter_set_init_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_sink_set_init_method( +extern enum bt_component_class_status +bt_component_class_sink_set_init_method( struct bt_component_class_sink *comp_cls, bt_component_class_sink_init_method method) { @@ -284,7 +287,8 @@ int bt_component_class_sink_set_init_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_source_set_finalize_method( +extern enum bt_component_class_status +bt_component_class_source_set_finalize_method( struct bt_component_class_source *comp_cls, bt_component_class_source_finalize_method method) { @@ -297,7 +301,8 @@ int bt_component_class_source_set_finalize_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_finalize_method( +extern enum bt_component_class_status +bt_component_class_filter_set_finalize_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_finalize_method method) { @@ -310,7 +315,8 @@ int bt_component_class_filter_set_finalize_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_sink_set_finalize_method( +extern enum bt_component_class_status +bt_component_class_sink_set_finalize_method( struct bt_component_class_sink *comp_cls, bt_component_class_sink_finalize_method method) { @@ -323,7 +329,8 @@ int bt_component_class_sink_set_finalize_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_source_set_query_method( +extern enum bt_component_class_status +bt_component_class_source_set_query_method( struct bt_component_class_source *comp_cls, bt_component_class_source_query_method method) { @@ -336,7 +343,8 @@ int bt_component_class_source_set_query_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_query_method( +extern enum bt_component_class_status +bt_component_class_filter_set_query_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_query_method method) { @@ -349,7 +357,8 @@ int bt_component_class_filter_set_query_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_sink_set_query_method( +extern enum bt_component_class_status +bt_component_class_sink_set_query_method( struct bt_component_class_sink *comp_cls, bt_component_class_sink_query_method method) { @@ -362,7 +371,8 @@ int bt_component_class_sink_set_query_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_accept_input_port_connection_method( +extern enum bt_component_class_status +bt_component_class_filter_set_accept_input_port_connection_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_accept_input_port_connection_method method) { @@ -375,7 +385,8 @@ int bt_component_class_filter_set_accept_input_port_connection_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_sink_set_accept_input_port_connection_method( +extern enum bt_component_class_status +bt_component_class_sink_set_accept_input_port_connection_method( struct bt_component_class_sink *comp_cls, bt_component_class_sink_accept_input_port_connection_method method) { @@ -388,7 +399,8 @@ int bt_component_class_sink_set_accept_input_port_connection_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_source_set_accept_output_port_connection_method( +extern enum bt_component_class_status +bt_component_class_source_set_accept_output_port_connection_method( struct bt_component_class_source *comp_cls, bt_component_class_source_accept_output_port_connection_method method) { @@ -401,7 +413,8 @@ int bt_component_class_source_set_accept_output_port_connection_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_accept_output_port_connection_method( +extern enum bt_component_class_status +bt_component_class_filter_set_accept_output_port_connection_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_accept_output_port_connection_method method) { @@ -414,7 +427,8 @@ int bt_component_class_filter_set_accept_output_port_connection_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_input_port_connected_method( +extern enum bt_component_class_status +bt_component_class_filter_set_input_port_connected_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_input_port_connected_method method) { @@ -427,7 +441,8 @@ int bt_component_class_filter_set_input_port_connected_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_sink_set_input_port_connected_method( +extern enum bt_component_class_status +bt_component_class_sink_set_input_port_connected_method( struct bt_component_class_sink *comp_cls, bt_component_class_sink_input_port_connected_method method) { @@ -440,7 +455,8 @@ int bt_component_class_sink_set_input_port_connected_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_source_set_output_port_connected_method( +extern enum bt_component_class_status +bt_component_class_source_set_output_port_connected_method( struct bt_component_class_source *comp_cls, bt_component_class_source_output_port_connected_method method) { @@ -453,7 +469,8 @@ int bt_component_class_source_set_output_port_connected_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_output_port_connected_method( +extern enum bt_component_class_status +bt_component_class_filter_set_output_port_connected_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_output_port_connected_method method) { @@ -479,7 +496,8 @@ int bt_component_class_source_set_message_iterator_init_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_message_iterator_init_method( +extern enum bt_component_class_status +bt_component_class_filter_set_message_iterator_init_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_message_iterator_init_method method) { @@ -492,7 +510,8 @@ int bt_component_class_filter_set_message_iterator_init_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_source_set_message_iterator_finalize_method( +extern enum bt_component_class_status +bt_component_class_source_set_message_iterator_finalize_method( struct bt_component_class_source *comp_cls, bt_component_class_source_message_iterator_finalize_method method) { @@ -505,7 +524,8 @@ int bt_component_class_source_set_message_iterator_finalize_method( return BT_COMPONENT_CLASS_STATUS_OK; } -int bt_component_class_filter_set_message_iterator_finalize_method( +extern enum bt_component_class_status +bt_component_class_filter_set_message_iterator_finalize_method( struct bt_component_class_filter *comp_cls, bt_component_class_filter_message_iterator_finalize_method method) { @@ -518,6 +538,118 @@ int bt_component_class_filter_set_message_iterator_finalize_method( return BT_COMPONENT_CLASS_STATUS_OK; } +enum bt_component_class_status +bt_component_class_filter_set_message_iterator_seek_ns_from_origin_method( + struct bt_component_class_filter *comp_cls, + bt_component_class_filter_message_iterator_seek_ns_from_origin_method method) +{ + BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); + BT_ASSERT_PRE_NON_NULL(method, "Method"); + BT_ASSERT_PRE_COMP_CLS_HOT(comp_cls); + comp_cls->methods.msg_iter_seek_ns_from_origin = method; + BT_LIB_LOGV("Set filter component class's message iterator \"seek nanoseconds from origin\" method" + ": %!+C", comp_cls); + return BT_COMPONENT_CLASS_STATUS_OK; +} + +enum bt_component_class_status +bt_component_class_source_set_message_iterator_seek_ns_from_origin_method( + struct bt_component_class_source *comp_cls, + bt_component_class_source_message_iterator_seek_ns_from_origin_method method) +{ + BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); + BT_ASSERT_PRE_NON_NULL(method, "Method"); + BT_ASSERT_PRE_COMP_CLS_HOT(comp_cls); + comp_cls->methods.msg_iter_seek_ns_from_origin = method; + BT_LIB_LOGV("Set source component class's message iterator \"seek nanoseconds from origin\" method" + ": %!+C", comp_cls); + return BT_COMPONENT_CLASS_STATUS_OK; +} + +enum bt_component_class_status +bt_component_class_filter_set_message_iterator_seek_beginning_method( + struct bt_component_class_filter *comp_cls, + bt_component_class_filter_message_iterator_seek_beginning_method method) +{ + BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); + BT_ASSERT_PRE_NON_NULL(method, "Method"); + BT_ASSERT_PRE_COMP_CLS_HOT(comp_cls); + comp_cls->methods.msg_iter_seek_beginning = method; + BT_LIB_LOGV("Set filter component class's message iterator \"seek beginning\" method" + ": %!+C", comp_cls); + return BT_COMPONENT_CLASS_STATUS_OK; +} + +enum bt_component_class_status +bt_component_class_source_set_message_iterator_seek_beginning_method( + struct bt_component_class_source *comp_cls, + bt_component_class_source_message_iterator_seek_beginning_method method) +{ + BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); + BT_ASSERT_PRE_NON_NULL(method, "Method"); + BT_ASSERT_PRE_COMP_CLS_HOT(comp_cls); + comp_cls->methods.msg_iter_seek_beginning = method; + BT_LIB_LOGV("Set source component class's message iterator \"seek beginning\" method" + ": %!+C", comp_cls); + return BT_COMPONENT_CLASS_STATUS_OK; +} + +enum bt_component_class_status +bt_component_class_filter_set_message_iterator_can_seek_beginning_method( + struct bt_component_class_filter *comp_cls, + bt_component_class_filter_message_iterator_can_seek_beginning_method method) +{ + BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); + BT_ASSERT_PRE_NON_NULL(method, "Method"); + BT_ASSERT_PRE_COMP_CLS_HOT(comp_cls); + comp_cls->methods.msg_iter_can_seek_beginning = method; + BT_LIB_LOGV("Set filter component class's message iterator \"can seek beginning\" method" + ": %!+C", comp_cls); + return BT_COMPONENT_CLASS_STATUS_OK; +} + +enum bt_component_class_status +bt_component_class_source_set_message_iterator_can_seek_beginning_method( + struct bt_component_class_source *comp_cls, + bt_component_class_source_message_iterator_can_seek_beginning_method method) +{ + BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); + BT_ASSERT_PRE_NON_NULL(method, "Method"); + BT_ASSERT_PRE_COMP_CLS_HOT(comp_cls); + comp_cls->methods.msg_iter_can_seek_beginning = method; + BT_LIB_LOGV("Set source component class's message iterator \"can seek beginning\" method" + ": %!+C", comp_cls); + return BT_COMPONENT_CLASS_STATUS_OK; +} + +enum bt_component_class_status +bt_component_class_filter_set_message_iterator_can_seek_ns_from_origin_method( + struct bt_component_class_filter *comp_cls, + bt_component_class_filter_message_iterator_can_seek_ns_from_origin_method method) +{ + BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); + BT_ASSERT_PRE_NON_NULL(method, "Method"); + BT_ASSERT_PRE_COMP_CLS_HOT(comp_cls); + comp_cls->methods.msg_iter_can_seek_ns_from_origin = method; + BT_LIB_LOGV("Set filter component class's message iterator \"can seek nanoseconds from origin\" method" + ": %!+C", comp_cls); + return BT_COMPONENT_CLASS_STATUS_OK; +} + +enum bt_component_class_status +bt_component_class_source_set_message_iterator_can_seek_ns_from_origin_method( + struct bt_component_class_source *comp_cls, + bt_component_class_source_message_iterator_can_seek_ns_from_origin_method method) +{ + BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); + BT_ASSERT_PRE_NON_NULL(method, "Method"); + BT_ASSERT_PRE_COMP_CLS_HOT(comp_cls); + comp_cls->methods.msg_iter_can_seek_ns_from_origin = method; + BT_LIB_LOGV("Set source component class's message iterator \"can seek nanoseconds from origin\" method" + ": %!+C", comp_cls); + return BT_COMPONENT_CLASS_STATUS_OK; +} + bt_component_class_status bt_component_class_set_description( struct bt_component_class *comp_cls, const char *description) diff --git a/lib/graph/component-source.c b/lib/graph/component-source.c index b1e44139..39755af4 100644 --- a/lib/graph/component-source.c +++ b/lib/graph/component-source.c @@ -32,7 +32,7 @@ #include #include #include -#include +#include #include #include diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index 51fa8d91..9fc612cc 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -39,8 +39,9 @@ #include #include #include +#include #include -#include +#include #include #include #include @@ -51,6 +52,7 @@ #include #include #include +#include #include #include #include @@ -68,52 +70,12 @@ */ #define MSG_BATCH_SIZE 15 -struct stream_state { - const struct bt_stream *stream; /* owned by this */ - const struct bt_packet *cur_packet; /* owned by this */ - uint64_t expected_msg_seq_num; - bt_bool is_ended; -}; - -BT_ASSERT_PRE_FUNC -static -void destroy_stream_state(struct stream_state *stream_state) -{ - if (!stream_state) { - return; - } - - BT_LOGV("Destroying stream state: stream-state-addr=%p", stream_state); - BT_LOGV_STR("Putting stream state's current packet."); - BT_OBJECT_PUT_REF_AND_RESET(stream_state->cur_packet); - BT_LOGV_STR("Putting stream state's stream."); - BT_OBJECT_PUT_REF_AND_RESET(stream_state->stream); - g_free(stream_state); -} - -BT_ASSERT_PRE_FUNC -static -struct stream_state *create_stream_state(const struct bt_stream *stream) -{ - struct stream_state *stream_state = g_new0(struct stream_state, 1); - - if (!stream_state) { - BT_LOGE_STR("Failed to allocate one stream state."); - goto end; - } - - /* - * We keep a reference to the stream until we know it's ended. - */ - stream_state->stream = stream; - bt_object_get_no_null_check(stream_state->stream); - BT_LIB_LOGV("Created stream state: %![stream-]+s, " - "stream-state-addr=%p", - stream, stream_state); - -end: - return stream_state; -} +#define BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(_iter) \ + BT_ASSERT_PRE((_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE || \ + (_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED || \ + (_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN || \ + (_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR, \ + "Message iterator is in the wrong state: %!+i", _iter) static inline void _set_self_comp_port_input_msg_iterator_state( @@ -121,8 +83,7 @@ void _set_self_comp_port_input_msg_iterator_state( enum bt_self_component_port_input_message_iterator_state state) { BT_ASSERT(iterator); - BT_LIB_LOGD("Updating message iterator's state: " - "new-state=%s", + BT_LIB_LOGD("Updating message iterator's state: new-state=%s", bt_self_component_port_input_message_iterator_state_string(state)); iterator->state = state; } @@ -172,17 +133,6 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj "%!+i", iterator); bt_self_component_port_input_message_iterator_try_finalize(iterator); - if (iterator->stream_states) { - /* - * Remove our destroy listener from each stream which - * has a state in this iterator. Otherwise the destroy - * listener would be called with an invalid/other - * message iterator object. - */ - g_hash_table_destroy(iterator->stream_states); - iterator->stream_states = NULL; - } - if (iterator->connection) { /* * Remove ourself from the originating connection so @@ -193,6 +143,21 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj iterator->connection = NULL; } + if (iterator->auto_seek_msgs) { + uint64_t i; + + /* Put any remaining message in the auto-seek array */ + for (i = 0; i < iterator->auto_seek_msgs->len; i++) { + if (iterator->auto_seek_msgs->pdata[i]) { + bt_object_put_no_null_check( + iterator->auto_seek_msgs->pdata[i]); + } + } + + g_ptr_array_free(iterator->auto_seek_msgs, TRUE); + iterator->auto_seek_msgs = NULL; + } + destroy_base_message_iterator(obj); } @@ -305,6 +270,21 @@ end: return ret; } +static +bt_bool can_seek_ns_from_origin_true( + struct bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin) +{ + return BT_TRUE; +} + +static +bt_bool can_seek_beginning_true( + struct bt_self_component_port_input_message_iterator *iterator) +{ + return BT_TRUE; +} + static struct bt_self_component_port_input_message_iterator * bt_self_component_port_input_message_iterator_create_initial( @@ -340,20 +320,84 @@ bt_self_component_port_input_message_iterator_create_initial( goto end; } - iterator->stream_states = g_hash_table_new_full(g_direct_hash, - g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state); - if (!iterator->stream_states) { - BT_LOGE_STR("Failed to allocate a GHashTable."); - BT_OBJECT_PUT_REF_AND_RESET(iterator); + iterator->auto_seek_msgs = g_ptr_array_new(); + if (!iterator->auto_seek_msgs) { + BT_LOGE_STR("Failed to allocate a GPtrArray."); + ret = -1; goto end; } + g_ptr_array_set_size(iterator->auto_seek_msgs, MSG_BATCH_SIZE); iterator->upstream_component = upstream_comp; iterator->upstream_port = upstream_port; iterator->connection = iterator->upstream_port->connection; iterator->graph = bt_component_borrow_graph(upstream_comp); set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED); + + switch (iterator->upstream_component->class->type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + { + struct bt_component_class_source *src_comp_cls = + (void *) iterator->upstream_component->class; + + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + src_comp_cls->methods.msg_iter_next; + iterator->methods.seek_ns_from_origin = + (bt_self_component_port_input_message_iterator_seek_ns_from_origin_method) + src_comp_cls->methods.msg_iter_seek_ns_from_origin; + iterator->methods.seek_beginning = + (bt_self_component_port_input_message_iterator_seek_beginning_method) + src_comp_cls->methods.msg_iter_seek_beginning; + iterator->methods.can_seek_ns_from_origin = + (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method) + src_comp_cls->methods.msg_iter_can_seek_ns_from_origin; + iterator->methods.can_seek_beginning = + (bt_self_component_port_input_message_iterator_can_seek_beginning_method) + src_comp_cls->methods.msg_iter_can_seek_beginning; + break; + } + case BT_COMPONENT_CLASS_TYPE_FILTER: + { + struct bt_component_class_filter *flt_comp_cls = + (void *) iterator->upstream_component->class; + + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + flt_comp_cls->methods.msg_iter_next; + iterator->methods.seek_ns_from_origin = + (bt_self_component_port_input_message_iterator_seek_ns_from_origin_method) + flt_comp_cls->methods.msg_iter_seek_ns_from_origin; + iterator->methods.seek_beginning = + (bt_self_component_port_input_message_iterator_seek_beginning_method) + flt_comp_cls->methods.msg_iter_seek_beginning; + iterator->methods.can_seek_ns_from_origin = + (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method) + flt_comp_cls->methods.msg_iter_can_seek_ns_from_origin; + iterator->methods.can_seek_beginning = + (bt_self_component_port_input_message_iterator_can_seek_beginning_method) + flt_comp_cls->methods.msg_iter_can_seek_beginning; + break; + } + default: + abort(); + } + + if (iterator->methods.seek_ns_from_origin && + !iterator->methods.can_seek_ns_from_origin) { + iterator->methods.can_seek_ns_from_origin = + (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method) + can_seek_ns_from_origin_true; + } + + if (iterator->methods.seek_beginning && + !iterator->methods.can_seek_beginning) { + iterator->methods.can_seek_beginning = + (bt_self_component_port_input_message_iterator_seek_beginning_method) + can_seek_beginning_true; + } + BT_LIB_LOGD("Created initial message iterator on self component input port: " "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", upstream_port, upstream_comp, iterator); @@ -510,236 +554,11 @@ void bt_message_borrow_packet_stream(const struct bt_message *msg, } } -BT_ASSERT_PRE_FUNC -static inline -bool validate_message( - struct bt_self_component_port_input_message_iterator *iterator, - const struct bt_message *c_msg) -{ - bool is_valid = true; - struct stream_state *stream_state; - const struct bt_stream *stream = NULL; - const struct bt_packet *packet = NULL; - struct bt_message *msg = (void *) c_msg; - - BT_ASSERT(msg); - bt_message_borrow_packet_stream(c_msg, &stream, &packet); - - if (!stream) { - /* we don't care about messages not attached to streams */ - goto end; - } - - stream_state = g_hash_table_lookup(iterator->stream_states, stream); - if (!stream_state) { - /* - * No stream state for this stream: this message - * MUST be a BT_MESSAGE_TYPE_STREAM_BEGINNING message - * and its sequence number must be 0. - */ - if (c_msg->type != BT_MESSAGE_TYPE_STREAM_BEGINNING) { - BT_ASSERT_PRE_MSG("Unexpected message: missing a " - "BT_MESSAGE_TYPE_STREAM_BEGINNING " - "message prior to this message: " - "%![stream-]+s", stream); - is_valid = false; - goto end; - } - - if (c_msg->seq_num == -1ULL) { - msg->seq_num = 0; - } - - if (c_msg->seq_num != 0) { - BT_ASSERT_PRE_MSG("Unexpected message sequence " - "number for this message iterator: " - "this is the first message for this " - "stream, expecting sequence number 0: " - "seq-num=%" PRIu64 ", %![stream-]+s", - c_msg->seq_num, stream); - is_valid = false; - goto end; - } - - stream_state = create_stream_state(stream); - if (!stream_state) { - abort(); - } - - g_hash_table_insert(iterator->stream_states, - (void *) stream, stream_state); - stream_state->expected_msg_seq_num++; - goto end; - } - - if (stream_state->is_ended) { - /* - * There's a new message which has a reference to a - * stream which, from this iterator's point of view, is - * ended ("end of stream" message was returned). - * This is bad: the API guarantees that it can never - * happen. - */ - BT_ASSERT_PRE_MSG("Stream is already ended: %![stream-]+s", - stream); - is_valid = false; - goto end; - } - - if (c_msg->seq_num == -1ULL) { - msg->seq_num = stream_state->expected_msg_seq_num; - } - - if (c_msg->seq_num != -1ULL && - c_msg->seq_num != stream_state->expected_msg_seq_num) { - BT_ASSERT_PRE_MSG("Unexpected message sequence number: " - "seq-num=%" PRIu64 ", " - "expected-seq-num=%" PRIu64 ", %![stream-]+s", - c_msg->seq_num, stream_state->expected_msg_seq_num, - stream); - is_valid = false; - goto end; - } - - switch (c_msg->type) { - case BT_MESSAGE_TYPE_STREAM_BEGINNING: - BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_STREAM_BEGINNING " - "message at this point: msg-seq-num=%" PRIu64 ", " - "%![stream-]+s", c_msg->seq_num, stream); - is_valid = false; - goto end; - case BT_MESSAGE_TYPE_STREAM_END: - if (stream_state->cur_packet) { - BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_STREAM_END " - "message: missing a " - "BT_MESSAGE_TYPE_PACKET_END message " - "prior to this message: " - "msg-seq-num=%" PRIu64 ", " - "%![stream-]+s", c_msg->seq_num, stream); - is_valid = false; - goto end; - } - stream_state->expected_msg_seq_num++; - stream_state->is_ended = true; - goto end; - case BT_MESSAGE_TYPE_PACKET_BEGINNING: - if (stream_state->cur_packet) { - BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_PACKET_BEGINNING " - "message at this point: missing a " - "BT_MESSAGE_TYPE_PACKET_END message " - "prior to this message: " - "msg-seq-num=%" PRIu64 ", %![stream-]+s, " - "%![packet-]+a", c_msg->seq_num, stream, - packet); - is_valid = false; - goto end; - } - stream_state->expected_msg_seq_num++; - stream_state->cur_packet = packet; - bt_object_get_no_null_check(stream_state->cur_packet); - goto end; - case BT_MESSAGE_TYPE_PACKET_END: - if (!stream_state->cur_packet) { - BT_ASSERT_PRE_MSG("Unexpected BT_MESSAGE_TYPE_PACKET_END " - "message at this point: missing a " - "BT_MESSAGE_TYPE_PACKET_BEGINNING message " - "prior to this message: " - "msg-seq-num=%" PRIu64 ", %![stream-]+s, " - "%![packet-]+a", c_msg->seq_num, stream, - packet); - is_valid = false; - goto end; - } - stream_state->expected_msg_seq_num++; - BT_OBJECT_PUT_REF_AND_RESET(stream_state->cur_packet); - goto end; - case BT_MESSAGE_TYPE_EVENT: - if (packet != stream_state->cur_packet) { - BT_ASSERT_PRE_MSG("Unexpected packet for " - "BT_MESSAGE_TYPE_EVENT message: " - "msg-seq-num=%" PRIu64 ", %![stream-]+s, " - "%![msg-packet-]+a, %![expected-packet-]+a", - c_msg->seq_num, stream, - stream_state->cur_packet, packet); - is_valid = false; - goto end; - } - stream_state->expected_msg_seq_num++; - goto end; - default: - break; - } - -end: - return is_valid; -} - -BT_ASSERT_PRE_FUNC -static inline -bool validate_messages( - struct bt_self_component_port_input_message_iterator *iterator, - uint64_t count) -{ - bool ret = true; - bt_message_array_const msgs = - (void *) iterator->base.msgs->pdata; - uint64_t i; - - for (i = 0; i < count; i++) { - ret = validate_message(iterator, msgs[i]); - if (!ret) { - break; - } - } - - return ret; -} - -BT_ASSERT_PRE_FUNC -static inline bool self_comp_port_input_msg_iter_can_end( - struct bt_self_component_port_input_message_iterator *iterator) -{ - GHashTableIter iter; - gpointer stream_key, state_value; - bool ret = true; - - /* - * Verify that this iterator received a - * BT_MESSAGE_TYPE_STREAM_END message for each stream - * which has a state. - */ - - g_hash_table_iter_init(&iter, iterator->stream_states); - - while (g_hash_table_iter_next(&iter, &stream_key, &state_value)) { - struct stream_state *stream_state = (void *) state_value; - - BT_ASSERT(stream_state); - BT_ASSERT(stream_key); - - if (!stream_state->is_ended) { - BT_ASSERT_PRE_MSG("Ending message iterator, " - "but stream is not ended: " - "%![stream-]s", stream_key); - ret = false; - goto end; - } - } - -end: - return ret; -} - enum bt_message_iterator_status bt_self_component_port_input_message_iterator_next( struct bt_self_component_port_input_message_iterator *iterator, bt_message_array_const *msgs, uint64_t *user_count) { - typedef enum bt_self_message_iterator_status (*method_t)( - void *, bt_message_array_const, uint64_t, uint64_t *); - - method_t method = NULL; - struct bt_component_class *comp_cls; int status = BT_MESSAGE_ITERATOR_STATUS_OK; BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); @@ -748,7 +567,7 @@ bt_self_component_port_input_message_iterator_next( BT_ASSERT_PRE(iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE, "Message iterator's \"next\" called, but " - "iterator is in the wrong state: %!+i", iterator); + "message iterator is in the wrong state: %!+i", iterator); BT_ASSERT(iterator->upstream_component); BT_ASSERT(iterator->upstream_component->class); BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured, @@ -756,38 +575,16 @@ bt_self_component_port_input_message_iterator_next( bt_component_borrow_graph(iterator->upstream_component)); BT_LIB_LOGD("Getting next self component input port " "message iterator's messages: %!+i", iterator); - comp_cls = iterator->upstream_component->class; - - /* Pick the appropriate "next" method */ - switch (comp_cls->type) { - case BT_COMPONENT_CLASS_TYPE_SOURCE: - { - struct bt_component_class_source *src_comp_cls = - (void *) comp_cls; - - method = (method_t) src_comp_cls->methods.msg_iter_next; - break; - } - case BT_COMPONENT_CLASS_TYPE_FILTER: - { - struct bt_component_class_filter *flt_comp_cls = - (void *) comp_cls; - - method = (method_t) flt_comp_cls->methods.msg_iter_next; - break; - } - default: - abort(); - } /* * Call the user's "next" method to get the next messages * and status. */ - BT_ASSERT(method); + BT_ASSERT(iterator->methods.next); BT_LOGD_STR("Calling user's \"next\" method."); - status = method(iterator, (void *) iterator->base.msgs->pdata, - MSG_BATCH_SIZE, user_count); + status = iterator->methods.next(iterator, + (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE, + user_count); BT_LOGD("User method returned: status=%s", bt_message_iterator_status_string(status)); if (status < 0) { @@ -801,6 +598,9 @@ bt_self_component_port_input_message_iterator_next( * during its "next" method, as the only way to do this is to * put the last iterator's reference, and this can only be done * by its downstream owner. + * + * For the same reason, there is no way that this iterator could + * have seeked (cannot seek a self message iterator). */ BT_ASSERT(iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); @@ -808,18 +608,15 @@ bt_self_component_port_input_message_iterator_next( switch (status) { case BT_MESSAGE_ITERATOR_STATUS_OK: - BT_ASSERT_PRE(validate_messages(iterator, *user_count), - "Messages are invalid at this point: " - "%![msg-iter-]+i, count=%" PRIu64, - iterator, *user_count); + BT_ASSERT_PRE(*user_count <= MSG_BATCH_SIZE, + "Invalid returned message count: greater than " + "batch size: count=%" PRIu64 ", batch-size=%u", + *user_count, MSG_BATCH_SIZE); *msgs = (void *) iterator->base.msgs->pdata; break; case BT_MESSAGE_ITERATOR_STATUS_AGAIN: goto end; case BT_MESSAGE_ITERATOR_STATUS_END: - BT_ASSERT_PRE(self_comp_port_input_msg_iter_can_end(iterator), - "Message iterator cannot end at this point: " - "%!+i", iterator); set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED); goto end; @@ -880,13 +677,22 @@ enum bt_message_iterator_status bt_port_output_message_iterator_next( return status; } -struct bt_component *bt_self_component_port_input_message_iterator_borrow_component( +struct bt_component * +bt_self_component_port_input_message_iterator_borrow_component( struct bt_self_component_port_input_message_iterator *iterator) { BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); return iterator->upstream_component; } +const struct bt_component * +bt_self_component_port_input_message_iterator_borrow_component_const( + const struct bt_self_component_port_input_message_iterator *iterator) +{ + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + return iterator->upstream_component; +} + struct bt_self_component *bt_self_message_iterator_borrow_component( struct bt_self_message_iterator *self_iterator) { @@ -922,8 +728,7 @@ void bt_port_output_message_iterator_destroy(struct bt_object *obj) } struct bt_port_output_message_iterator * -bt_port_output_message_iterator_create( - struct bt_graph *graph, +bt_port_output_message_iterator_create(struct bt_graph *graph, const struct bt_port_output *output_port) { struct bt_port_output_message_iterator *iterator = NULL; @@ -1049,6 +854,522 @@ end: return (void *) iterator; } +bt_bool bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( + struct bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin) +{ + bt_bool can = BT_FALSE; + + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); + BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured, + "Graph is not configured: %!+g", + bt_component_borrow_graph(iterator->upstream_component)); + + if (iterator->methods.can_seek_ns_from_origin) { + can = iterator->methods.can_seek_ns_from_origin(iterator, + ns_from_origin); + goto end; + } + + /* + * Automatic seeking fall back: if we can seek to the beginning, + * then we can automatically seek to any message. + */ + if (iterator->methods.can_seek_beginning) { + can = iterator->methods.can_seek_beginning(iterator); + } + +end: + return can; +} + +bt_bool bt_self_component_port_input_message_iterator_can_seek_beginning( + struct bt_self_component_port_input_message_iterator *iterator) +{ + bt_bool can = BT_FALSE; + + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); + BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured, + "Graph is not configured: %!+g", + bt_component_borrow_graph(iterator->upstream_component)); + + if (iterator->methods.can_seek_beginning) { + can = iterator->methods.can_seek_beginning(iterator); + } + + return can; +} + +static inline +void _set_iterator_state_after_seeking( + struct bt_self_component_port_input_message_iterator *iterator, + enum bt_message_iterator_status status) +{ + enum bt_self_component_port_input_message_iterator_state new_state = 0; + + /* Set iterator's state depending on seeking status */ + switch (status) { + case BT_MESSAGE_ITERATOR_STATUS_OK: + new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE; + break; + case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN; + break; + case BT_MESSAGE_ITERATOR_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_STATUS_NOMEM: + new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR; + break; + case BT_MESSAGE_ITERATOR_STATUS_END: + new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED; + break; + default: + abort(); + } + + set_self_comp_port_input_msg_iterator_state(iterator, new_state); +} + +#ifdef BT_DEV_MODE +# define set_iterator_state_after_seeking _set_iterator_state_after_seeking +#else +# define set_iterator_state_after_seeking(_iter, _status) +#endif + +enum bt_message_iterator_status +bt_self_component_port_input_message_iterator_seek_beginning( + struct bt_self_component_port_input_message_iterator *iterator) +{ + int status; + + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); + BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured, + "Graph is not configured: %!+g", + bt_component_borrow_graph(iterator->upstream_component)); + BT_ASSERT_PRE( + bt_self_component_port_input_message_iterator_can_seek_beginning( + iterator), + "Message iterator cannot seek beginning: %!+i", iterator); + BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", iterator); + set_self_comp_port_input_msg_iterator_state(iterator, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING); + status = iterator->methods.seek_beginning(iterator); + BT_LOGD("User method returned: status=%s", + bt_message_iterator_status_string(status)); + BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK || + status == BT_MESSAGE_ITERATOR_STATUS_ERROR || + status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || + status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + "Unexpected status: %![iter-]+i, status=%s", + iterator, bt_self_message_iterator_status_string(status)); + set_iterator_state_after_seeking(iterator, status); + return status; +} + +static inline +int get_message_ns_from_origin(const struct bt_message *msg, + int64_t *ns_from_origin, bool *ignore) +{ + const struct bt_clock_snapshot *clk_snapshot = NULL; + int ret = 0; + + switch (msg->type) { + case BT_MESSAGE_TYPE_EVENT: + { + const struct bt_message_event *event_msg = + (const void *) msg; + + clk_snapshot = event_msg->event->default_cs; + BT_ASSERT_PRE(clk_snapshot, + "Event has no default clock snapshot: %!+e", + event_msg->event); + break; + } + case BT_MESSAGE_TYPE_INACTIVITY: + { + const struct bt_message_inactivity *inactivity_msg = + (const void *) msg; + + BT_ASSERT(inactivity_msg->default_cs); + clk_snapshot = inactivity_msg->default_cs; + break; + } + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + case BT_MESSAGE_TYPE_STREAM_END: + /* Ignore */ + goto end; + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + { + const struct bt_message_packet_beginning *pkt_msg = + (const void *) msg; + + clk_snapshot = pkt_msg->packet->default_beginning_cs; + break; + } + case BT_MESSAGE_TYPE_PACKET_END: + { + const struct bt_message_packet_end *pkt_msg = + (const void *) msg; + + clk_snapshot = pkt_msg->packet->default_end_cs; + break; + } + default: + abort(); + } + + if (!clk_snapshot) { + *ignore = true; + goto end; + } + + ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot, + ns_from_origin); + +end: + return ret; +} + +static +enum bt_message_iterator_status find_message_ge_ns_from_origin( + struct bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin) +{ + int status; + enum bt_self_component_port_input_message_iterator_state init_state = + iterator->state; + const struct bt_message *messages[MSG_BATCH_SIZE]; + uint64_t user_count = 0; + uint64_t i; + + BT_ASSERT(iterator); + memset(&messages[0], 0, sizeof(messages[0]) * MSG_BATCH_SIZE); + + /* + * Make this iterator temporarily active (not seeking) to call + * the "next" method. + */ + set_self_comp_port_input_msg_iterator_state(iterator, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); + + BT_ASSERT(iterator->methods.next); + + while (true) { + /* + * Call the user's "next" method to get the next + * messages and status. + */ + BT_LOGD_STR("Calling user's \"next\" method."); + status = iterator->methods.next(iterator, + &messages[0], MSG_BATCH_SIZE, &user_count); + BT_LOGD("User method returned: status=%s", + bt_message_iterator_status_string(status)); + +#ifdef BT_DEV_MODE + /* + * The user's "next" method must not do any action which + * would change the iterator's state. + */ + BT_ASSERT(iterator->state == + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); +#endif + + switch (status) { + case BT_MESSAGE_ITERATOR_STATUS_OK: + BT_ASSERT_PRE(user_count <= MSG_BATCH_SIZE, + "Invalid returned message count: greater than " + "batch size: count=%" PRIu64 ", batch-size=%u", + user_count, MSG_BATCH_SIZE); + break; + case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_MESSAGE_ITERATOR_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_STATUS_NOMEM: + case BT_MESSAGE_ITERATOR_STATUS_END: + goto end; + default: + abort(); + } + + /* + * Find first message which has a default clock snapshot + * that is greater than or equal to the requested value. + * + * For event and inactivity messages, compare with the + * default clock snapshot. + * + * For packet beginning messages, compare with the + * default beginning clock snapshot, if any. + * + * For packet end messages, compare with the default end + * clock snapshot, if any. + * + * For stream beginning, stream end, ignore. + */ + for (i = 0; i < user_count; i++) { + const struct bt_message *msg = messages[i]; + int64_t msg_ns_from_origin; + bool ignore = false; + int ret; + + BT_ASSERT(msg); + ret = get_message_ns_from_origin(msg, &msg_ns_from_origin, + &ignore); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (ignore) { + /* Skip message without a clock snapshot */ + continue; + } + + if (msg_ns_from_origin >= ns_from_origin) { + /* + * We found it: move this message and + * the following ones to the iterator's + * auto-seek message array. + */ + uint64_t j; + + for (j = i; j < user_count; j++) { + iterator->auto_seek_msgs->pdata[j - i] = + (void *) messages[j]; + messages[j] = NULL; + } + + iterator->auto_seek_msg_count = user_count - i; + goto end; + } + + bt_object_put_no_null_check(msg); + messages[i] = NULL; + } + } + +end: + for (i = 0; i < user_count; i++) { + if (messages[i]) { + bt_object_put_no_null_check(messages[i]); + } + } + + set_self_comp_port_input_msg_iterator_state(iterator, init_state); + return status; +} + +static +enum bt_self_message_iterator_status post_auto_seek_next( + struct bt_self_component_port_input_message_iterator *iterator, + bt_message_array_const msgs, uint64_t capacity, + uint64_t *count) +{ + BT_ASSERT(iterator->auto_seek_msg_count <= capacity); + BT_ASSERT(iterator->auto_seek_msg_count > 0); + + /* + * Move auto-seek messages to the output array (which is this + * iterator's base message array. + */ + memcpy(&msgs[0], &iterator->auto_seek_msgs->pdata[0], + sizeof(msgs[0]) * iterator->auto_seek_msg_count); + memset(&iterator->auto_seek_msgs->pdata[0], 0, + sizeof(iterator->auto_seek_msgs->pdata[0]) * + iterator->auto_seek_msg_count); + *count = iterator->auto_seek_msg_count; + + /* Restore real user's "next" method */ + switch (iterator->upstream_component->class->type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + { + struct bt_component_class_source *src_comp_cls = + (void *) iterator->upstream_component->class; + + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + src_comp_cls->methods.msg_iter_next; + break; + } + case BT_COMPONENT_CLASS_TYPE_FILTER: + { + struct bt_component_class_filter *flt_comp_cls = + (void *) iterator->upstream_component->class; + + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + flt_comp_cls->methods.msg_iter_next; + break; + } + default: + abort(); + } + + return BT_SELF_MESSAGE_ITERATOR_STATUS_OK; +} + +enum bt_message_iterator_status +bt_self_component_port_input_message_iterator_seek_ns_from_origin( + struct bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin) +{ + int status; + + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); + BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured, + "Graph is not configured: %!+g", + bt_component_borrow_graph(iterator->upstream_component)); + BT_ASSERT_PRE( + bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( + iterator, ns_from_origin), + "Message iterator cannot seek nanoseconds from origin: %!+i, " + "ns-from-origin=%" PRId64, iterator, ns_from_origin); + set_self_comp_port_input_msg_iterator_state(iterator, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING); + + if (iterator->methods.seek_ns_from_origin) { + BT_LIB_LOGD("Calling user's \"seek nanoseconds from origin\" method: " + "%![iter-]+i, ns=%" PRId64, iterator, ns_from_origin); + status = iterator->methods.seek_ns_from_origin(iterator, + ns_from_origin); + BT_LOGD("User method returned: status=%s", + bt_message_iterator_status_string(status)); + BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK || + status == BT_MESSAGE_ITERATOR_STATUS_ERROR || + status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || + status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + "Unexpected status: %![iter-]+i, status=%s", + iterator, + bt_self_message_iterator_status_string(status)); + } else { + /* Start automatic seeking: seek beginning first */ + BT_ASSERT(iterator->methods.can_seek_beginning(iterator)); + BT_ASSERT(iterator->methods.seek_beginning); + BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", + iterator); + status = iterator->methods.seek_beginning(iterator); + BT_LOGD("User method returned: status=%s", + bt_message_iterator_status_string(status)); + BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK || + status == BT_MESSAGE_ITERATOR_STATUS_ERROR || + status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || + status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + "Unexpected status: %![iter-]+i, status=%s", + iterator, + bt_self_message_iterator_status_string(status)); + switch (status) { + case BT_MESSAGE_ITERATOR_STATUS_OK: + break; + case BT_MESSAGE_ITERATOR_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_STATUS_NOMEM: + case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + goto end; + default: + abort(); + } + + /* + * Find the first message which has a default clock + * snapshot greater than or equal to the requested + * nanoseconds from origin, and move the received + * messages from this point in the batch to this + * iterator's auto-seek message array. + */ + status = find_message_ge_ns_from_origin(iterator, + ns_from_origin); + switch (status) { + case BT_MESSAGE_ITERATOR_STATUS_OK: + /* + * Replace the user's "next" method with a + * custom, temporary "next" method which returns + * the messages in the iterator's message array. + */ + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + post_auto_seek_next; + break; + case BT_MESSAGE_ITERATOR_STATUS_ERROR: + case BT_MESSAGE_ITERATOR_STATUS_NOMEM: + case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + goto end; + case BT_MESSAGE_ITERATOR_STATUS_END: + /* + * The iterator reached the end: just return + * `BT_MESSAGE_ITERATOR_STATUS_OK` here, as if + * the seeking operation occured: the next + * "next" method will return + * `BT_MESSAGE_ITERATOR_STATUS_END` itself. + */ + break; + default: + abort(); + } + } + +end: + set_iterator_state_after_seeking(iterator, status); + + if (status == BT_MESSAGE_ITERATOR_STATUS_END) { + status = BT_MESSAGE_ITERATOR_STATUS_OK; + } + + return status; +} + +static inline +bt_self_component_port_input_message_iterator * +borrow_output_port_message_iterator_upstream_iterator( + struct bt_port_output_message_iterator *iterator) +{ + struct bt_component_class_sink_colander_priv_data *colander_data; + + BT_ASSERT(iterator); + colander_data = (void *) iterator->colander->parent.user_data; + BT_ASSERT(colander_data); + BT_ASSERT(colander_data->msg_iter); + return colander_data->msg_iter; +} + +bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin( + struct bt_port_output_message_iterator *iterator, + int64_t ns_from_origin) +{ + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + return bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( + borrow_output_port_message_iterator_upstream_iterator( + iterator), ns_from_origin); +} + +bt_bool bt_port_output_message_iterator_can_seek_beginning( + struct bt_port_output_message_iterator *iterator) +{ + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + return bt_self_component_port_input_message_iterator_can_seek_beginning( + borrow_output_port_message_iterator_upstream_iterator( + iterator)); +} + +enum bt_message_iterator_status bt_port_output_message_iterator_seek_ns_from_origin( + struct bt_port_output_message_iterator *iterator, + int64_t ns_from_origin) +{ + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + return bt_self_component_port_input_message_iterator_seek_ns_from_origin( + borrow_output_port_message_iterator_upstream_iterator(iterator), + ns_from_origin); +} + +enum bt_message_iterator_status bt_port_output_message_iterator_seek_beginning( + struct bt_port_output_message_iterator *iterator) +{ + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + return bt_self_component_port_input_message_iterator_seek_beginning( + borrow_output_port_message_iterator_upstream_iterator( + iterator)); +} + void bt_port_output_message_iterator_get_ref( const struct bt_port_output_message_iterator *iterator) { diff --git a/lib/graph/message/message.c b/lib/graph/message/message.c index 64e85511..545d67cb 100644 --- a/lib/graph/message/message.c +++ b/lib/graph/message/message.c @@ -30,28 +30,14 @@ #include #include -BT_ASSERT_PRE_FUNC -static inline void _init_seq_num(struct bt_message *message) -{ - message->seq_num = UINT64_C(-1); -} - -#ifdef BT_DEV_MODE -# define init_seq_num _init_seq_num -#else -# define init_seq_num(_msg) -#endif /* BT_DEV_MODE */ - BT_HIDDEN void bt_message_init(struct bt_message *message, enum bt_message_type type, bt_object_release_func release, struct bt_graph *graph) { - BT_ASSERT(type >= 0 && - type <= BT_MESSAGE_TYPE_PACKET_END); + BT_ASSERT(type >= 0 && type <= BT_MESSAGE_TYPE_PACKET_END); message->type = type; - init_seq_num(message); bt_object_init_shared(&message->base, release); message->graph = graph; diff --git a/lib/plugin/plugin-so.c b/lib/plugin/plugin-so.c index 94df0b32..ed8d5e29 100644 --- a/lib/plugin/plugin-so.c +++ b/lib/plugin/plugin-so.c @@ -299,6 +299,10 @@ enum bt_plugin_status bt_plugin_so_init( bt_component_class_source_output_port_connected_method output_port_connected; bt_component_class_source_message_iterator_init_method msg_iter_init; bt_component_class_source_message_iterator_finalize_method msg_iter_finalize; + bt_component_class_source_message_iterator_seek_ns_from_origin_method msg_iter_seek_ns_from_origin; + bt_component_class_source_message_iterator_seek_beginning_method msg_iter_seek_beginning; + bt_component_class_source_message_iterator_can_seek_ns_from_origin_method msg_iter_can_seek_ns_from_origin; + bt_component_class_source_message_iterator_can_seek_beginning_method msg_iter_can_seek_beginning; } source; struct { @@ -311,6 +315,10 @@ enum bt_plugin_status bt_plugin_so_init( bt_component_class_filter_output_port_connected_method output_port_connected; bt_component_class_filter_message_iterator_init_method msg_iter_init; bt_component_class_filter_message_iterator_finalize_method msg_iter_finalize; + bt_component_class_filter_message_iterator_seek_ns_from_origin_method msg_iter_seek_ns_from_origin; + bt_component_class_filter_message_iterator_seek_beginning_method msg_iter_seek_beginning; + bt_component_class_filter_message_iterator_can_seek_ns_from_origin_method msg_iter_can_seek_ns_from_origin; + bt_component_class_filter_message_iterator_can_seek_beginning_method msg_iter_can_seek_beginning; } filter; struct { @@ -616,6 +624,62 @@ enum bt_plugin_status bt_plugin_so_init( abort(); } break; + case BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_NS_FROM_ORIGIN_METHOD: + switch (cc_type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + cc_full_descr->methods.source.msg_iter_seek_ns_from_origin = + cur_cc_descr_attr->value.source_msg_iter_seek_ns_from_origin_method; + break; + case BT_COMPONENT_CLASS_TYPE_FILTER: + cc_full_descr->methods.filter.msg_iter_seek_ns_from_origin = + cur_cc_descr_attr->value.filter_msg_iter_seek_ns_from_origin_method; + break; + default: + abort(); + } + break; + case BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_SEEK_BEGINNING_METHOD: + switch (cc_type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + cc_full_descr->methods.source.msg_iter_seek_beginning = + cur_cc_descr_attr->value.source_msg_iter_seek_beginning_method; + break; + case BT_COMPONENT_CLASS_TYPE_FILTER: + cc_full_descr->methods.filter.msg_iter_seek_beginning = + cur_cc_descr_attr->value.filter_msg_iter_seek_beginning_method; + break; + default: + abort(); + } + break; + case BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_NS_FROM_ORIGIN_METHOD: + switch (cc_type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + cc_full_descr->methods.source.msg_iter_can_seek_ns_from_origin = + cur_cc_descr_attr->value.source_msg_iter_can_seek_ns_from_origin_method; + break; + case BT_COMPONENT_CLASS_TYPE_FILTER: + cc_full_descr->methods.filter.msg_iter_can_seek_ns_from_origin = + cur_cc_descr_attr->value.filter_msg_iter_can_seek_ns_from_origin_method; + break; + default: + abort(); + } + break; + case BT_PLUGIN_COMPONENT_CLASS_DESCRIPTOR_ATTRIBUTE_TYPE_MSG_ITER_CAN_SEEK_BEGINNING_METHOD: + switch (cc_type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + cc_full_descr->methods.source.msg_iter_can_seek_beginning = + cur_cc_descr_attr->value.source_msg_iter_can_seek_beginning_method; + break; + case BT_COMPONENT_CLASS_TYPE_FILTER: + cc_full_descr->methods.filter.msg_iter_can_seek_beginning = + cur_cc_descr_attr->value.filter_msg_iter_can_seek_beginning_method; + break; + default: + abort(); + } + break; default: /* * WARN-level logging because this @@ -841,6 +905,54 @@ enum bt_plugin_status bt_plugin_so_init( } } + if (cc_full_descr->methods.source.msg_iter_seek_ns_from_origin) { + ret = bt_component_class_source_set_message_iterator_seek_ns_from_origin_method( + src_comp_class, + cc_full_descr->methods.source.msg_iter_seek_ns_from_origin); + if (ret) { + BT_LOGE_STR("Cannot set source component class's message iterator \"seek nanoseconds from origin\" method."); + status = BT_PLUGIN_STATUS_ERROR; + BT_OBJECT_PUT_REF_AND_RESET(src_comp_class); + goto end; + } + } + + if (cc_full_descr->methods.source.msg_iter_seek_beginning) { + ret = bt_component_class_source_set_message_iterator_seek_beginning_method( + src_comp_class, + cc_full_descr->methods.source.msg_iter_seek_beginning); + if (ret) { + BT_LOGE_STR("Cannot set source component class's message iterator \"seek beginning\" method."); + status = BT_PLUGIN_STATUS_ERROR; + BT_OBJECT_PUT_REF_AND_RESET(src_comp_class); + goto end; + } + } + + if (cc_full_descr->methods.source.msg_iter_can_seek_ns_from_origin) { + ret = bt_component_class_source_set_message_iterator_can_seek_ns_from_origin_method( + src_comp_class, + cc_full_descr->methods.source.msg_iter_can_seek_ns_from_origin); + if (ret) { + BT_LOGE_STR("Cannot set source component class's message iterator \"can seek nanoseconds from origin\" method."); + status = BT_PLUGIN_STATUS_ERROR; + BT_OBJECT_PUT_REF_AND_RESET(src_comp_class); + goto end; + } + } + + if (cc_full_descr->methods.source.msg_iter_can_seek_beginning) { + ret = bt_component_class_source_set_message_iterator_can_seek_beginning_method( + src_comp_class, + cc_full_descr->methods.source.msg_iter_can_seek_beginning); + if (ret) { + BT_LOGE_STR("Cannot set source component class's message iterator \"can seek beginning\" method."); + status = BT_PLUGIN_STATUS_ERROR; + BT_OBJECT_PUT_REF_AND_RESET(src_comp_class); + goto end; + } + } + break; case BT_COMPONENT_CLASS_TYPE_FILTER: if (cc_full_descr->methods.filter.init) { @@ -951,6 +1063,54 @@ enum bt_plugin_status bt_plugin_so_init( } } + if (cc_full_descr->methods.filter.msg_iter_seek_ns_from_origin) { + ret = bt_component_class_filter_set_message_iterator_seek_ns_from_origin_method( + flt_comp_class, + cc_full_descr->methods.filter.msg_iter_seek_ns_from_origin); + if (ret) { + BT_LOGE_STR("Cannot set filter component class's message iterator \"seek nanoseconds from origin\" method."); + status = BT_PLUGIN_STATUS_ERROR; + BT_OBJECT_PUT_REF_AND_RESET(flt_comp_class); + goto end; + } + } + + if (cc_full_descr->methods.filter.msg_iter_seek_beginning) { + ret = bt_component_class_filter_set_message_iterator_seek_beginning_method( + flt_comp_class, + cc_full_descr->methods.filter.msg_iter_seek_beginning); + if (ret) { + BT_LOGE_STR("Cannot set filter component class's message iterator \"seek beginning\" method."); + status = BT_PLUGIN_STATUS_ERROR; + BT_OBJECT_PUT_REF_AND_RESET(flt_comp_class); + goto end; + } + } + + if (cc_full_descr->methods.filter.msg_iter_can_seek_ns_from_origin) { + ret = bt_component_class_filter_set_message_iterator_can_seek_ns_from_origin_method( + flt_comp_class, + cc_full_descr->methods.filter.msg_iter_can_seek_ns_from_origin); + if (ret) { + BT_LOGE_STR("Cannot set filter component class's message iterator \"can seek nanoseconds from origin\" method."); + status = BT_PLUGIN_STATUS_ERROR; + BT_OBJECT_PUT_REF_AND_RESET(flt_comp_class); + goto end; + } + } + + if (cc_full_descr->methods.filter.msg_iter_can_seek_beginning) { + ret = bt_component_class_filter_set_message_iterator_can_seek_beginning_method( + flt_comp_class, + cc_full_descr->methods.filter.msg_iter_can_seek_beginning); + if (ret) { + BT_LOGE_STR("Cannot set filter component class's message iterator \"can seek beginning\" method."); + status = BT_PLUGIN_STATUS_ERROR; + BT_OBJECT_PUT_REF_AND_RESET(flt_comp_class); + goto end; + } + } + break; case BT_COMPONENT_CLASS_TYPE_SINK: if (cc_full_descr->methods.sink.init) { -- 2.34.1