#include <babeltrace/compiler-internal.h>
#include <babeltrace/ref.h>
-#include <babeltrace/ctf-ir/fields.h>
-#include <babeltrace/ctf-ir/field-types.h>
-#include <babeltrace/ctf-ir/field-types-internal.h>
-#include <babeltrace/ctf-ir/event-internal.h>
-#include <babeltrace/ctf-ir/packet-internal.h>
-#include <babeltrace/ctf-ir/stream-internal.h>
+#include <babeltrace/trace-ir/fields.h>
+#include <babeltrace/trace-ir/event-internal.h>
+#include <babeltrace/trace-ir/packet-internal.h>
+#include <babeltrace/trace-ir/stream-internal.h>
#include <babeltrace/graph/connection.h>
#include <babeltrace/graph/connection-internal.h>
#include <babeltrace/graph/component.h>
#include <babeltrace/graph/notification-packet-internal.h>
#include <babeltrace/graph/notification-stream.h>
#include <babeltrace/graph/notification-stream-internal.h>
-#include <babeltrace/graph/notification-discarded-elements-internal.h>
#include <babeltrace/graph/port.h>
#include <babeltrace/graph/graph-internal.h>
#include <babeltrace/types.h>
#include <inttypes.h>
#include <stdlib.h>
-struct discarded_elements_state {
- struct bt_clock_value *cur_begin;
- uint64_t cur_count;
-};
+/*
+ * TODO: Use graph's state (number of active iterators, etc.) and
+ * possibly system specifications to make a better guess than this.
+ */
+#define NOTIF_BATCH_SIZE 15
struct stream_state {
struct bt_stream *stream; /* owned by this */
struct bt_packet *cur_packet; /* owned by this */
- struct discarded_elements_state discarded_packets_state;
- struct discarded_elements_state discarded_events_state;
uint64_t expected_notif_seq_num;
bt_bool is_ended;
};
-static
-void stream_destroy_listener(struct bt_stream_common *stream, void *data)
-{
- struct bt_notification_iterator_private_connection *iterator = data;
-
- /* Remove associated stream state */
- g_hash_table_remove(iterator->stream_states, stream);
-}
-
+BT_ASSERT_PRE_FUNC
static
void destroy_stream_state(struct stream_state *stream_state)
{
bt_put(stream_state->cur_packet);
BT_LOGV_STR("Putting stream state's stream.");
bt_put(stream_state->stream);
- bt_put(stream_state->discarded_packets_state.cur_begin);
- bt_put(stream_state->discarded_events_state.cur_begin);
g_free(stream_state);
}
+BT_ASSERT_PRE_FUNC
static
struct stream_state *create_stream_state(struct bt_stream *stream)
{
goto end;
}
- /*
- * The packet index is a monotonic counter which may not start
- * at 0 at the beginning of the stream. We therefore need to
- * have an internal object initial state of -1ULL to distinguish
- * between initial state and having seen a packet with
- * the sequence number 0.
- */
- stream_state->discarded_packets_state.cur_count = -1ULL;
-
/*
* We keep a reference to the stream until we know it's ended.
*/
static
void destroy_base_notification_iterator(struct bt_object *obj)
{
- struct bt_notification_iterator *iterator =
- container_of(obj, struct bt_notification_iterator, base);
+ struct bt_notification_iterator *iterator = (void *) obj;
+
+ BT_ASSERT(iterator);
+
+ if (iterator->notifs) {
+ g_ptr_array_free(iterator->notifs, TRUE);
+ }
- BT_LOGD_STR("Putting current notification.");
- bt_put(iterator->current_notification);
g_free(iterator);
}
* reference count would go from 1 to 0 again and this function
* would be called again.
*/
- obj->ref_count.count++;
- iterator = (void *) container_of(obj, struct bt_notification_iterator, base);
+ obj->ref_count++;
+ iterator = (void *) obj;
BT_LOGD("Destroying private connection notification iterator object: addr=%p",
iterator);
bt_private_connection_notification_iterator_finalize(iterator);
* listener would be called with an invalid/other
* notification iterator object.
*/
- GHashTableIter ht_iter;
- gpointer stream_gptr, stream_state_gptr;
-
- g_hash_table_iter_init(&ht_iter, iterator->stream_states);
-
- while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) {
- BT_ASSERT(stream_gptr);
-
- BT_LOGD_STR("Removing stream's destroy listener for notification iterator.");
- bt_stream_common_remove_destroy_listener(
- (void *) stream_gptr, stream_destroy_listener,
- iterator);
- }
-
g_hash_table_destroy(iterator->stream_states);
}
}
static
-void init_notification_iterator(struct bt_notification_iterator *iterator,
+int init_notification_iterator(struct bt_notification_iterator *iterator,
enum bt_notification_iterator_type type,
bt_object_release_func destroy)
{
- bt_object_init(iterator, destroy);
+ int ret = 0;
+
+ bt_object_init_shared(&iterator->base, destroy);
iterator->type = type;
+ iterator->notifs = g_ptr_array_new();
+ if (!iterator->notifs) {
+ BT_LOGE_STR("Failed to allocate a GPtrArray.");
+ ret = -1;
+ goto end;
+ }
+
+ g_ptr_array_set_size(iterator->notifs, NOTIF_BATCH_SIZE);
+
+end:
+ return ret;
}
BT_HIDDEN
enum bt_connection_status status = BT_CONNECTION_STATUS_OK;
enum bt_component_class_type type;
struct bt_notification_iterator_private_connection *iterator = NULL;
+ int ret;
BT_ASSERT(upstream_comp);
BT_ASSERT(upstream_port);
goto end;
}
- init_notification_iterator((void *) iterator,
+ ret = init_notification_iterator((void *) iterator,
BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
bt_private_connection_notification_iterator_destroy);
+ if (ret) {
+ /* init_notification_iterator() logs errors */
+ status = BT_CONNECTION_STATUS_NOMEM;
+ goto end;
+ }
iterator->stream_states = g_hash_table_new_full(g_direct_hash,
g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
iterator->upstream_component = upstream_comp;
iterator->upstream_port = upstream_port;
iterator->connection = connection;
+ iterator->graph = bt_component_borrow_graph(upstream_comp);
iterator->state = BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_NON_INITIALIZED;
BT_LOGD("Created notification iterator: "
"upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
void *bt_private_connection_private_notification_iterator_get_user_data(
struct bt_private_connection_private_notification_iterator *private_iterator)
{
- struct bt_notification_iterator_private_connection *iterator =
+ struct bt_notification_iterator_private_connection *iterator = (void *)
bt_private_connection_notification_iterator_borrow_from_private(private_iterator);
BT_ASSERT_PRE_NON_NULL(private_iterator, "Notification iterator");
struct bt_private_connection_private_notification_iterator *private_iterator,
void *data)
{
- struct bt_notification_iterator_private_connection *iterator =
+ struct bt_notification_iterator_private_connection *iterator = (void *)
bt_private_connection_notification_iterator_borrow_from_private(private_iterator);
BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
return BT_NOTIFICATION_ITERATOR_STATUS_OK;
}
-struct bt_notification *bt_notification_iterator_borrow_notification(
- struct bt_notification_iterator *iterator)
+struct bt_graph *bt_private_connection_private_notification_iterator_borrow_graph(
+ struct bt_private_connection_private_notification_iterator *private_iterator)
{
+ struct bt_notification_iterator_private_connection *iterator = (void *)
+ bt_private_connection_notification_iterator_borrow_from_private(
+ private_iterator);
+
BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
- return bt_notification_iterator_borrow_current_notification(iterator);
+ return iterator->graph;
}
BT_ASSERT_PRE_FUNC
return is_valid;
}
+BT_ASSERT_PRE_FUNC
+static inline
+bool validate_notifications(
+ struct bt_notification_iterator_private_connection *iterator,
+ uint64_t count)
+{
+ bool ret = true;
+ bt_notification_array notifs = (void *) iterator->base.notifs->pdata;
+ uint64_t i;
+
+ for (i = 0; i < count; i++) {
+ ret = validate_notification(iterator, notifs[i]);
+ if (!ret) {
+ break;
+ }
+ }
+
+ return ret;
+}
+
BT_ASSERT_PRE_FUNC
static inline bool priv_conn_notif_iter_can_end(
struct bt_notification_iterator_private_connection *iterator)
return ret;
}
-static
enum bt_notification_iterator_status
-bt_priv_conn_private_notification_iterator_next(
- struct bt_notification_iterator_private_connection *iterator)
+bt_private_connection_notification_iterator_next(
+ struct bt_notification_iterator *user_iterator,
+ struct bt_notification ***user_notifs, uint64_t *user_count)
{
+ struct bt_notification_iterator_private_connection *iterator =
+ (void *) user_iterator;
struct bt_private_connection_private_notification_iterator *priv_iterator =
bt_private_connection_private_notification_iterator_from_notification_iterator(iterator);
bt_component_class_notification_iterator_next_method next_method = NULL;
- struct bt_notification_iterator_next_method_return next_return = {
- .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
- .notification = NULL,
- };
enum bt_notification_iterator_status status =
BT_NOTIFICATION_ITERATOR_STATUS_OK;
- BT_ASSERT(iterator);
- BT_LIB_LOGD("Getting next notification iterator's notification: %!+i",
+ BT_ASSERT_PRE_NON_NULL(user_iterator, "Notification iterator");
+ BT_ASSERT_PRE_NON_NULL(user_notifs, "Notification array");
+ BT_ASSERT_PRE_NON_NULL(user_count, "Notification count");
+ BT_ASSERT_PRE(user_iterator->type ==
+ BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
+ "Notification iterator was not created from a private connection: "
+ "%!+i", iterator);
+ BT_LIB_LOGD("Getting next private connection notification iterator's notification: %!+i",
iterator);
BT_ASSERT_PRE(iterator->state ==
BT_PRIVATE_CONNECTION_NOTIFICATION_ITERATOR_STATE_ACTIVE,
*/
BT_ASSERT(next_method);
BT_LOGD_STR("Calling user's \"next\" method.");
- next_return = next_method(priv_iterator);
+ status = next_method(priv_iterator,
+ (void *) user_iterator->notifs->pdata,
+ NOTIF_BATCH_SIZE, user_count);
BT_LOGD("User method returned: status=%s",
- bt_notification_iterator_status_string(next_return.status));
- if (next_return.status < 0) {
+ bt_notification_iterator_status_string(status));
+ if (status < 0) {
BT_LOGW_STR("User method failed.");
- status = next_return.status;
goto end;
}
* BT_NOTIFICATION_ITERATOR_STATUS_OK because
* otherwise this field could be garbage.
*/
- if (next_return.status ==
- BT_NOTIFICATION_ITERATOR_STATUS_OK) {
- bt_put(next_return.notification);
+ if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+ uint64_t i;
+ bt_notification_array notifs =
+ (void *) user_iterator->notifs->pdata;
+
+ for (i = 0; i < *user_count; i++) {
+ bt_put(notifs[i]);
+ }
}
status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
goto end;
}
- switch (next_return.status) {
+ switch (status) {
+ case BT_NOTIFICATION_ITERATOR_STATUS_OK:
+ BT_ASSERT_PRE(validate_notifications(iterator, *user_count),
+ "Notifications are invalid at this point: "
+ "%![notif-iter-]+i, count=%" PRIu64,
+ iterator, *user_count);
+ *user_notifs = (void *) user_iterator->notifs->pdata;
+ break;
+ case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+ status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ goto end;
case BT_NOTIFICATION_ITERATOR_STATUS_END:
BT_ASSERT_PRE(priv_conn_notif_iter_can_end(iterator),
"Notification iterator cannot end at this point: "
BT_LOGD("Set new status: status=%s",
bt_notification_iterator_status_string(status));
goto end;
- case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
- status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
- goto end;
- case BT_NOTIFICATION_ITERATOR_STATUS_OK:
- BT_ASSERT_PRE(next_return.notification,
- "User method returned BT_NOTIFICATION_ITERATOR_STATUS_OK, but notification is NULL: "
- "%!+i", iterator);
- BT_ASSERT_PRE(validate_notification(iterator,
- next_return.notification),
- "Notification is invalid at this point: "
- "%![notif-iter-]+i, %![notif-]+n",
- iterator, next_return.notification);
- bt_notification_iterator_replace_current_notification(
- (void *) iterator, next_return.notification);
- bt_put(next_return.notification);
- break;
default:
/* Unknown non-error status */
abort();
}
enum bt_notification_iterator_status
-bt_notification_iterator_next(struct bt_notification_iterator *iterator)
+bt_output_port_notification_iterator_next(
+ struct bt_notification_iterator *iterator,
+ bt_notification_array *notifs_to_user,
+ uint64_t *count_to_user)
{
enum bt_notification_iterator_status status;
+ struct bt_notification_iterator_output_port *out_port_iter =
+ (void *) iterator;
+ enum bt_graph_status graph_status;
BT_ASSERT_PRE_NON_NULL(iterator, "Notification iterator");
- BT_LOGD("Notification iterator's \"next\": iter-addr=%p", iterator);
- BT_ASSERT(iterator->type == BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION ||
- iterator->type == BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT);
-
- switch (iterator->type) {
- case BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION:
- {
- struct bt_notification_iterator_private_connection *priv_conn_iter =
- (void *) iterator;
+ BT_ASSERT_PRE_NON_NULL(notifs_to_user, "Notification array");
+ BT_ASSERT_PRE_NON_NULL(count_to_user, "Notification count");
+ BT_ASSERT_PRE(iterator->type ==
+ BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
+ "Notification iterator was not created from an output port: "
+ "%!+i", iterator);
+ BT_LIB_LOGD("Getting next output port notification iterator's notification: %!+i",
+ iterator);
- /*
- * Make sure that the iterator's queue contains at least
- * one notification.
- */
- status = bt_priv_conn_private_notification_iterator_next(
- priv_conn_iter);
+ graph_status = bt_graph_consume_sink_no_check(
+ out_port_iter->graph, out_port_iter->colander);
+ switch (graph_status) {
+ case BT_GRAPH_STATUS_CANCELED:
+ status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
break;
- }
- case BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT:
- {
- struct bt_notification_iterator_output_port *out_port_iter =
- (void *) iterator;
-
- /*
- * Keep current notification in case there's an error:
- * restore this notification so that the current
- * notification is not changed from the user's point of
- * view.
- */
- struct bt_notification *old_notif =
- bt_get(bt_notification_iterator_borrow_current_notification(iterator));
- enum bt_graph_status graph_status;
+ case BT_GRAPH_STATUS_AGAIN:
+ status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ break;
+ case BT_GRAPH_STATUS_END:
+ status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ break;
+ case BT_GRAPH_STATUS_NOMEM:
+ status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ break;
+ case BT_GRAPH_STATUS_OK:
+ status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
/*
- * Put current notification since it's possibly
- * about to be replaced by a new one by the
- * colander sink.
+ * On success, the colander sink moves the notifications
+ * to this iterator's array and sets this iterator's
+ * notification count: move them to the user.
*/
- bt_notification_iterator_replace_current_notification(
- iterator, NULL);
- graph_status = bt_graph_consume_sink_no_check(
- out_port_iter->graph, out_port_iter->colander);
- switch (graph_status) {
- case BT_GRAPH_STATUS_CANCELED:
- status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
- break;
- case BT_GRAPH_STATUS_AGAIN:
- status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
- break;
- case BT_GRAPH_STATUS_END:
- status = BT_NOTIFICATION_ITERATOR_STATUS_END;
- break;
- case BT_GRAPH_STATUS_NOMEM:
- status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
- break;
- case BT_GRAPH_STATUS_OK:
- status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
- BT_ASSERT(bt_notification_iterator_borrow_current_notification(iterator));
- break;
- default:
- /* Other errors */
- status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
- }
-
- if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
- /* Error/exception: restore old notification */
- bt_notification_iterator_replace_current_notification(
- iterator, old_notif);
- }
-
- bt_put(old_notif);
+ *notifs_to_user = (void *) iterator->notifs->pdata;
+ *count_to_user = out_port_iter->count;
break;
- }
default:
- abort();
+ /* Other errors */
+ status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
}
return status;
iterator);
BT_LOGD_STR("Putting graph.");
bt_put(iterator->graph);
- BT_LOGD_STR("Putting output port.");
- bt_put(iterator->output_port);
BT_LOGD_STR("Putting colander sink component.");
bt_put(iterator->colander);
destroy_base_notification_iterator(obj);
struct bt_port *output_port,
const char *colander_component_name)
{
- struct bt_notification_iterator *iterator_base = NULL;
struct bt_notification_iterator_output_port *iterator = NULL;
struct bt_component_class *colander_comp_cls = NULL;
struct bt_component *output_port_comp = NULL;
const char *colander_comp_name;
struct bt_port *colander_in_port = NULL;
struct bt_component_class_sink_colander_data colander_data;
+ int ret;
BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
BT_ASSERT_PRE(bt_port_get_type(output_port) == BT_PORT_TYPE_OUTPUT,
goto error;
}
- init_notification_iterator((void *) iterator,
+ ret = init_notification_iterator((void *) iterator,
BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
bt_output_port_notification_iterator_destroy);
+ if (ret) {
+ /* init_notification_iterator() logs errors */
+ BT_PUT(iterator);
+ goto end;
+ }
/* Create colander component */
colander_comp_cls = bt_component_class_sink_colander_get();
}
BT_MOVE(iterator->graph, graph);
- iterator_base = (void *) iterator;
colander_comp_name =
colander_component_name ? colander_component_name : "colander";
- colander_data.notification = &iterator_base->current_notification;
+ colander_data.notifs = (void *) iterator->base.notifs->pdata;
+ colander_data.count_addr = &iterator->count;
+
graph_status = bt_graph_add_component_with_init_method_data(
iterator->graph, colander_comp_cls, colander_comp_name,
NULL, &colander_data, &iterator->colander);
* nonconsumable forever so that only this notification iterator
* can consume (thanks to bt_graph_consume_sink_no_check()).
* This avoids leaking the notification created by the colander
- * sink and moved to the base notification iterator's current
- * notification member.
+ * sink and moved to the notification iterator's notification
+ * member.
*/
bt_graph_set_can_consume(iterator->graph, BT_FALSE);
goto end;
}
struct bt_notification_iterator *
-bt_private_connection_notification_iterator_from_private(
+bt_private_connection_notification_iterator_borrow_from_private(
struct bt_private_connection_private_notification_iterator *private_notification_iterator)
{
- return bt_get(
- bt_private_connection_notification_iterator_borrow_from_private(
- private_notification_iterator));
+ return (void *) private_notification_iterator;
}