X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=lib%2Fgraph%2Fiterator.c;h=389eec528c022d6c1ff32b7e1b7ccde3a07e0a3f;hb=3230ee6b4f3a704958b761daecae835c56938bc9;hp=29562955dd72e4baa3b8e455df24cefb8098fc04;hpb=c5504f7322f93b32b4bcc65a950ed61efadd694a;p=babeltrace.git diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index 29562955..389eec52 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -4,8 +4,7 @@ * Babeltrace Notification Iterator * * Copyright 2015 Jérémie Galarneau - * - * Author: Jérémie Galarneau + * Copyright 2017 Philippe Proulx * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -28,12 +27,234 @@ #include #include +#include +#include +#include #include #include #include #include #include #include +#include +#include +#include +#include +#include +#include +#include + +struct stream_state { + struct bt_ctf_stream *stream; /* owned by this */ + struct bt_ctf_packet *cur_packet; /* owned by this */ + bool is_ended; +}; + +enum action_type { + ACTION_TYPE_PUSH_NOTIF, + ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM, + ACTION_TYPE_ADD_STREAM_STATE, + ACTION_TYPE_SET_STREAM_STATE_IS_ENDED, + ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET, +}; + +struct action { + enum action_type type; + union { + /* ACTION_TYPE_PUSH_NOTIF */ + struct { + struct bt_notification *notif; /* owned by this */ + } push_notif; + + /* ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM */ + struct { + struct bt_ctf_stream *stream; /* owned by this */ + struct bt_component *component; /* owned by this */ + struct bt_port *port; /* owned by this */ + } map_port_to_comp_in_stream; + + /* ACTION_TYPE_ADD_STREAM_STATE */ + struct { + struct bt_ctf_stream *stream; /* owned by this */ + struct stream_state *stream_state; /* owned by this */ + } add_stream_state; + + /* ACTION_TYPE_SET_STREAM_STATE_IS_ENDED */ + struct { + struct stream_state *stream_state; /* weak */ + } set_stream_state_is_ended; + + /* ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET */ + struct { + struct stream_state *stream_state; /* weak */ + struct bt_ctf_packet *packet; /* owned by this */ + } set_stream_state_cur_packet; + } payload; +}; + +static +void stream_destroy_listener(struct bt_ctf_stream *stream, void *data) +{ + struct bt_notification_iterator *iterator = data; + + /* Remove associated stream state */ + g_hash_table_remove(iterator->stream_states, stream); +} + +static +void destroy_stream_state(struct stream_state *stream_state) +{ + if (!stream_state) { + return; + } + + bt_put(stream_state->cur_packet); + bt_put(stream_state->stream); + g_free(stream_state); +} + +static +void destroy_action(struct action *action) +{ + assert(action); + + switch (action->type) { + case ACTION_TYPE_PUSH_NOTIF: + BT_PUT(action->payload.push_notif.notif); + break; + case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM: + BT_PUT(action->payload.map_port_to_comp_in_stream.stream); + BT_PUT(action->payload.map_port_to_comp_in_stream.component); + BT_PUT(action->payload.map_port_to_comp_in_stream.port); + break; + case ACTION_TYPE_ADD_STREAM_STATE: + BT_PUT(action->payload.add_stream_state.stream); + destroy_stream_state( + action->payload.add_stream_state.stream_state); + action->payload.add_stream_state.stream_state = NULL; + break; + case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET: + BT_PUT(action->payload.set_stream_state_cur_packet.packet); + break; + case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED: + break; + default: + assert(false); + } +} + +static +void add_action(struct bt_notification_iterator *iterator, + struct action *action) +{ + g_array_append_val(iterator->actions, *action); +} + +static +void clear_actions(struct bt_notification_iterator *iterator) +{ + size_t i; + + for (i = 0; i < iterator->actions->len; i++) { + struct action *action = &g_array_index(iterator->actions, + struct action, i); + + destroy_action(action); + } + + g_array_set_size(iterator->actions, 0); +} + +static +void apply_actions(struct bt_notification_iterator *iterator) +{ + size_t i; + + for (i = 0; i < iterator->actions->len; i++) { + struct action *action = &g_array_index(iterator->actions, + struct action, i); + + switch (action->type) { + case ACTION_TYPE_PUSH_NOTIF: + /* Move notification to queue */ + g_queue_push_head(iterator->queue, + action->payload.push_notif.notif); + bt_notification_freeze( + action->payload.push_notif.notif); + action->payload.push_notif.notif = NULL; + break; + case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM: + bt_ctf_stream_map_component_to_port( + action->payload.map_port_to_comp_in_stream.stream, + action->payload.map_port_to_comp_in_stream.component, + action->payload.map_port_to_comp_in_stream.port); + break; + case ACTION_TYPE_ADD_STREAM_STATE: + /* Move stream state to hash table */ + g_hash_table_insert(iterator->stream_states, + action->payload.add_stream_state.stream, + action->payload.add_stream_state.stream_state); + + action->payload.add_stream_state.stream_state = NULL; + break; + case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED: + /* + * We know that this stream is ended. We need to + * remember this as long as the stream exists to + * enforce that the same stream does not end + * twice. + * + * Here we add a destroy listener to the stream + * which we put after (becomes weak as the hash + * table key). If we were the last object to own + * this stream, the destroy listener is called + * when we call bt_put() which removes this + * stream state completely. This is important + * because the memory used by this stream object + * could be reused for another stream, and they + * must have different states. + */ + bt_ctf_stream_add_destroy_listener( + action->payload.set_stream_state_is_ended.stream_state->stream, + stream_destroy_listener, iterator); + action->payload.set_stream_state_is_ended.stream_state->is_ended = true; + BT_PUT(action->payload.set_stream_state_is_ended.stream_state->stream); + break; + case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET: + /* Move packet to stream state's current packet */ + BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet, + action->payload.set_stream_state_cur_packet.packet); + break; + default: + assert(false); + } + } + + clear_actions(iterator); +} + +static +struct stream_state *create_stream_state(struct bt_ctf_stream *stream) +{ + struct stream_state *stream_state = g_new0(struct stream_state, 1); + + if (!stream_state) { + goto end; + } + + /* + * We keep a reference to the stream until we know it's ended + * because we need to be able to create an automatic "stream + * end" notification when the user's "next" method returns + * BT_NOTIFICATION_ITERATOR_STATUS_END. + * + * We put this reference when the stream is marked as ended. + */ + stream_state->stream = bt_get(stream); + +end: + return stream_state; +} static void bt_notification_iterator_destroy(struct bt_object *obj) @@ -44,8 +265,8 @@ void bt_notification_iterator_destroy(struct bt_object *obj) assert(obj); iterator = container_of(obj, struct bt_notification_iterator, base); - assert(iterator->component); - comp_class = iterator->component->class; + assert(iterator->upstream_component); + comp_class = iterator->upstream_component->class; /* Call user-defined destroy method */ switch (comp_class->type) { @@ -78,38 +299,99 @@ void bt_notification_iterator_destroy(struct bt_object *obj) assert(0); } - BT_PUT(iterator->current_notification); - BT_PUT(iterator->component); + if (iterator->queue) { + struct bt_notification *notif; + + while ((notif = g_queue_pop_tail(iterator->queue))) { + bt_put(notif); + } + + g_queue_free(iterator->queue); + } + + if (iterator->stream_states) { + /* + * Remove our destroy listener from each stream which + * has a state in this iterator. Otherwise the destroy + * listener would be called with an invalid/other + * notification iterator object. + */ + GHashTableIter ht_iter; + gpointer stream_gptr, stream_state_gptr; + + g_hash_table_iter_init(&ht_iter, iterator->stream_states); + + while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) { + assert(stream_gptr); + bt_ctf_stream_remove_destroy_listener( + (void *) stream_gptr, stream_destroy_listener, + iterator); + } + + g_hash_table_destroy(iterator->stream_states); + } + + if (iterator->actions) { + g_array_free(iterator->actions, TRUE); + } + + bt_put(iterator->current_notification); + bt_put(iterator->upstream_component); + bt_put(iterator->upstream_port); g_free(iterator); } BT_HIDDEN struct bt_notification_iterator *bt_notification_iterator_create( - struct bt_component *component) + struct bt_component *upstream_comp, + struct bt_port *upstream_port) { enum bt_component_class_type type; struct bt_notification_iterator *iterator = NULL; - if (!component) { - goto end; - } + assert(upstream_comp); + assert(upstream_port); + assert(bt_port_is_connected(upstream_port)); - type = bt_component_get_class_type(component); + type = bt_component_get_class_type(upstream_comp); switch (type) { case BT_COMPONENT_CLASS_TYPE_SOURCE: case BT_COMPONENT_CLASS_TYPE_FILTER: break; default: - goto end; + goto error; } iterator = g_new0(struct bt_notification_iterator, 1); if (!iterator) { - goto end; + goto error; } - iterator->component = bt_get(component); bt_object_init(iterator, bt_notification_iterator_destroy); + + iterator->stream_states = g_hash_table_new_full(g_direct_hash, + g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state); + if (!iterator->stream_states) { + goto error; + } + + iterator->queue = g_queue_new(); + if (!iterator->queue) { + goto error; + } + + iterator->actions = g_array_new(FALSE, FALSE, sizeof(struct action)); + if (!iterator->actions) { + goto error; + } + + iterator->upstream_component = bt_get(upstream_comp); + iterator->upstream_port = bt_get(upstream_port); + goto end; + +error: + BT_PUT(iterator); + end: return iterator; } @@ -122,7 +404,7 @@ enum bt_notification_iterator_status bt_notification_iterator_validate( BT_NOTIFICATION_ITERATOR_STATUS_OK; if (!iterator) { - ret = BT_NOTIFICATION_ITERATOR_STATUS_INVAL; + ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; goto end; } end: @@ -149,7 +431,7 @@ bt_private_notification_iterator_set_user_data( bt_notification_iterator_from_private(private_iterator); if (!iterator) { - ret = BT_NOTIFICATION_ITERATOR_STATUS_INVAL; + ret = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; goto end; } @@ -173,29 +455,693 @@ end: return notification; } -enum bt_notification_iterator_status -bt_notification_iterator_next(struct bt_notification_iterator *iterator) +static +bool validate_notification(struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream, + struct bt_ctf_packet *notif_packet) +{ + bool is_valid = true; + struct stream_state *stream_state; + struct bt_port *stream_comp_cur_port; + + assert(notif_stream); + stream_comp_cur_port = + bt_ctf_stream_port_for_component(notif_stream, + iterator->upstream_component); + if (!stream_comp_cur_port) { + /* + * This is the first time this notification iterator + * bumps into this stream. Add an action to map the + * iterator's upstream component to the iterator's + * upstream port in this stream. + */ + struct action action = { + .type = ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM, + .payload.map_port_to_comp_in_stream = { + .stream = bt_get(notif_stream), + .component = bt_get(iterator->upstream_component), + .port = bt_get(iterator->upstream_port), + }, + }; + + add_action(iterator, &action); + } else { + if (stream_comp_cur_port != iterator->upstream_port) { + /* + * It looks like two different ports of the same + * component are emitting notifications which + * have references to the same stream. This is + * bad: the API guarantees that it can never + * happen. + */ + is_valid = false; + goto end; + } + + } + + stream_state = g_hash_table_lookup(iterator->stream_states, + notif_stream); + if (stream_state) { + if (stream_state->is_ended) { + /* + * There's a new notification which has a + * reference to a stream which, from this + * iterator's point of view, is ended ("end of + * stream" notification was returned). This is + * bad: the API guarantees that it can never + * happen. + */ + is_valid = false; + goto end; + } + + switch (notif->type) { + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + /* + * We already have a stream state, which means + * we already returned a "stream begin" + * notification: this is an invalid duplicate. + */ + is_valid = false; + goto end; + case BT_NOTIFICATION_TYPE_PACKET_BEGIN: + if (notif_packet == stream_state->cur_packet) { + /* Duplicate "packet begin" notification */ + is_valid = false; + goto end; + } + break; + default: + break; + } + } + +end: + return is_valid; +} + +static +void add_action_push_notif(struct bt_notification_iterator *iterator, + struct bt_notification *notif) +{ + struct action action = { + .type = ACTION_TYPE_PUSH_NOTIF, + .payload.push_notif = { + .notif = bt_get(notif), + }, + }; + + assert(notif); + add_action(iterator, &action); +} + +static +int add_action_push_notif_stream_begin( + struct bt_notification_iterator *iterator, + struct bt_ctf_stream *stream) +{ + int ret = 0; + struct bt_notification *stream_begin_notif = NULL; + + assert(stream); + stream_begin_notif = bt_notification_stream_begin_create(stream); + if (!stream_begin_notif) { + goto error; + } + + add_action_push_notif(iterator, stream_begin_notif); + goto end; + +error: + ret = -1; + +end: + bt_put(stream_begin_notif); + return ret; +} + +static +int add_action_push_notif_stream_end( + struct bt_notification_iterator *iterator, + struct bt_ctf_stream *stream) +{ + int ret = 0; + struct bt_notification *stream_end_notif = NULL; + + assert(stream); + stream_end_notif = bt_notification_stream_end_create(stream); + if (!stream_end_notif) { + goto error; + } + + add_action_push_notif(iterator, stream_end_notif); + goto end; + +error: + ret = -1; + +end: + bt_put(stream_end_notif); + return ret; +} + +static +int add_action_push_notif_packet_begin( + struct bt_notification_iterator *iterator, + struct bt_ctf_packet *packet) +{ + int ret = 0; + struct bt_notification *packet_begin_notif = NULL; + + assert(packet); + packet_begin_notif = bt_notification_packet_begin_create(packet); + if (!packet_begin_notif) { + goto error; + } + + add_action_push_notif(iterator, packet_begin_notif); + goto end; + +error: + ret = -1; + +end: + bt_put(packet_begin_notif); + return ret; +} + +static +int add_action_push_notif_packet_end( + struct bt_notification_iterator *iterator, + struct bt_ctf_packet *packet) +{ + int ret = 0; + struct bt_notification *packet_end_notif = NULL; + + assert(packet); + packet_end_notif = bt_notification_packet_end_create(packet); + if (!packet_end_notif) { + goto error; + } + + add_action_push_notif(iterator, packet_end_notif); + goto end; + +error: + ret = -1; + +end: + bt_put(packet_end_notif); + return ret; +} + +static +void add_action_set_stream_state_is_ended( + struct bt_notification_iterator *iterator, + struct stream_state *stream_state) +{ + struct action action = { + .type = ACTION_TYPE_SET_STREAM_STATE_IS_ENDED, + .payload.set_stream_state_is_ended = { + .stream_state = stream_state, + }, + }; + + assert(stream_state); + add_action(iterator, &action); +} + +static +void add_action_set_stream_state_cur_packet( + struct bt_notification_iterator *iterator, + struct stream_state *stream_state, + struct bt_ctf_packet *packet) +{ + struct action action = { + .type = ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET, + .payload.set_stream_state_cur_packet = { + .stream_state = stream_state, + .packet = bt_get(packet), + }, + }; + + assert(stream_state); + add_action(iterator, &action); +} + +static +int ensure_stream_state_exists(struct bt_notification_iterator *iterator, + struct bt_notification *stream_begin_notif, + struct bt_ctf_stream *notif_stream, + struct stream_state **stream_state) +{ + int ret = 0; + + if (!notif_stream) { + /* + * The notification does not reference any stream: no + * need to get or create a stream state. + */ + goto end; + } + + *stream_state = g_hash_table_lookup(iterator->stream_states, + notif_stream); + if (!*stream_state) { + /* + * This iterator did not bump into this stream yet: + * create a stream state and a "stream begin" + * notification. + */ + struct action action = { + .type = ACTION_TYPE_ADD_STREAM_STATE, + .payload.add_stream_state = { + .stream = bt_get(notif_stream), + .stream_state = NULL, + }, + }; + + *stream_state = create_stream_state(notif_stream); + if (!stream_state) { + goto error; + } + + action.payload.add_stream_state.stream_state = + *stream_state; + add_action(iterator, &action); + + if (stream_begin_notif) { + add_action_push_notif(iterator, stream_begin_notif); + } else { + ret = add_action_push_notif_stream_begin(iterator, + notif_stream); + if (ret) { + goto error; + } + } + } + + goto end; + +error: + destroy_stream_state(*stream_state); + ret = -1; + +end: + return ret; +} + +static +int handle_packet_switch(struct bt_notification_iterator *iterator, + struct bt_notification *packet_begin_notif, + struct bt_ctf_packet *new_packet, + struct stream_state *stream_state) +{ + int ret = 0; + + if (stream_state->cur_packet == new_packet) { + goto end; + } + + if (stream_state->cur_packet) { + /* End of the current packet */ + ret = add_action_push_notif_packet_end(iterator, + stream_state->cur_packet); + if (ret) { + goto error; + } + } + + /* Beginning of the new packet */ + if (packet_begin_notif) { + add_action_push_notif(iterator, packet_begin_notif); + } else if (new_packet) { + ret = add_action_push_notif_packet_begin(iterator, + new_packet); + if (ret) { + goto error; + } + } + + add_action_set_stream_state_cur_packet(iterator, stream_state, + new_packet); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_stream_begin( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_BEGIN); + assert(notif_stream); + ret = ensure_stream_state_exists(iterator, notif, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_stream_end( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_END); + assert(notif_stream); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + ret = handle_packet_switch(iterator, NULL, NULL, stream_state); + if (ret) { + goto error; + } + + add_action_push_notif(iterator, notif); + add_action_set_stream_state_is_ended(iterator, stream_state); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_packet_begin( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream, + struct bt_ctf_packet *notif_packet) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_BEGIN); + assert(notif_packet); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + ret = handle_packet_switch(iterator, notif, notif_packet, stream_state); + if (ret) { + goto error; + } + + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_packet_end( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream, + struct bt_ctf_packet *notif_packet) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_END); + assert(notif_packet); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state); + if (ret) { + goto error; + } + + /* End of the current packet */ + add_action_push_notif(iterator, notif); + add_action_set_stream_state_cur_packet(iterator, stream_state, NULL); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_event( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream, + struct bt_ctf_packet *notif_packet) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_EVENT); + assert(notif_packet); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state); + if (ret) { + goto error; + } + + add_action_push_notif(iterator, notif); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int enqueue_notification_and_automatic( + struct bt_notification_iterator *iterator, + struct bt_notification *notif) +{ + int ret = 0; + struct bt_ctf_event *notif_event = NULL; + struct bt_ctf_stream *notif_stream = NULL; + struct bt_ctf_packet *notif_packet = NULL; + + assert(notif); + + /* Get the stream and packet referred by the notification */ + switch (notif->type) { + case BT_NOTIFICATION_TYPE_EVENT: + notif_event = bt_notification_event_borrow_event(notif); + assert(notif_event); + notif_packet = bt_ctf_event_borrow_packet(notif_event); + assert(notif_packet); + break; + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + notif_stream = + bt_notification_stream_begin_borrow_stream(notif); + assert(notif_stream); + break; + case BT_NOTIFICATION_TYPE_STREAM_END: + notif_stream = bt_notification_stream_end_borrow_stream(notif); + assert(notif_stream); + break; + case BT_NOTIFICATION_TYPE_PACKET_BEGIN: + notif_packet = + bt_notification_packet_begin_borrow_packet(notif); + assert(notif_packet); + break; + case BT_NOTIFICATION_TYPE_PACKET_END: + notif_packet = bt_notification_packet_end_borrow_packet(notif); + assert(notif_packet); + break; + case BT_NOTIFICATION_TYPE_INACTIVITY: + /* Always valid */ + break; + default: + /* + * Invalid type of notification. Only the notification + * types above are allowed to be returned by a user + * component. + */ + goto error; + } + + if (notif_packet) { + notif_stream = bt_ctf_packet_borrow_stream(notif_packet); + assert(notif_stream); + } + + if (!notif_stream) { + /* + * The notification has no reference to a stream: it + * cannot cause the creation of automatic notifications. + */ + goto end; + } + + if (!validate_notification(iterator, notif, notif_stream, + notif_packet)) { + goto error; + } + + switch (notif->type) { + case BT_NOTIFICATION_TYPE_EVENT: + ret = handle_notif_event(iterator, notif, notif_stream, + notif_packet); + break; + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + ret = handle_notif_stream_begin(iterator, notif, notif_stream); + break; + case BT_NOTIFICATION_TYPE_STREAM_END: + ret = handle_notif_stream_end(iterator, notif, notif_stream); + break; + case BT_NOTIFICATION_TYPE_PACKET_BEGIN: + ret = handle_notif_packet_begin(iterator, notif, notif_stream, + notif_packet); + break; + case BT_NOTIFICATION_TYPE_PACKET_END: + ret = handle_notif_packet_end(iterator, notif, notif_stream, + notif_packet); + break; + default: + break; + } + + if (ret) { + goto error; + } + + apply_actions(iterator); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_end(struct bt_notification_iterator *iterator) +{ + GHashTableIter stream_state_iter; + gpointer stream_gptr, stream_state_gptr; + int ret = 0; + + /* + * Emit a "stream end" notification for each non-ended stream + * known by this iterator and mark them as ended. + */ + g_hash_table_iter_init(&stream_state_iter, iterator->stream_states); + + while (g_hash_table_iter_next(&stream_state_iter, &stream_gptr, + &stream_state_gptr)) { + struct stream_state *stream_state = stream_state_gptr; + + assert(stream_state_gptr); + + if (stream_state->is_ended) { + continue; + } + + ret = handle_packet_switch(iterator, NULL, NULL, stream_state); + if (ret) { + goto error; + } + + ret = add_action_push_notif_stream_end(iterator, stream_gptr); + if (ret) { + goto error; + } + + add_action_set_stream_state_is_ended(iterator, stream_state); + } + + apply_actions(iterator); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +enum bt_notification_iterator_status ensure_queue_has_notifications( + struct bt_notification_iterator *iterator) { struct bt_private_notification_iterator *priv_iterator = bt_private_notification_iterator_from_notification_iterator(iterator); bt_component_class_notification_iterator_next_method next_method = NULL; - struct bt_notification_iterator_next_return next_return; + struct bt_notification_iterator_next_return next_return = { + .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, + .notification = NULL, + }; enum bt_notification_iterator_status status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + int ret; - if (!iterator) { - status = BT_NOTIFICATION_ITERATOR_STATUS_INVAL; + assert(iterator); + + if (iterator->queue->length > 0) { + /* We already have enough */ goto end; } - assert(iterator->component); - assert(iterator->component->class); + if (iterator->is_ended) { + status = BT_NOTIFICATION_ITERATOR_STATUS_END; + goto end; + } + + assert(iterator->upstream_component); + assert(iterator->upstream_component->class); - switch (iterator->component->class->type) { + /* Pick the appropriate "next" method */ + switch (iterator->upstream_component->class->type) { case BT_COMPONENT_CLASS_TYPE_SOURCE: { struct bt_component_class_source *source_class = - container_of(iterator->component->class, + container_of(iterator->upstream_component->class, struct bt_component_class_source, parent); assert(source_class->methods.iterator.next); @@ -205,7 +1151,7 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator) case BT_COMPONENT_CLASS_TYPE_FILTER: { struct bt_component_class_filter *filter_class = - container_of(iterator->component->class, + container_of(iterator->upstream_component->class, struct bt_component_class_filter, parent); assert(filter_class->methods.iterator.next); @@ -217,27 +1163,99 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator) break; } + /* + * Call the user's "next" method to get the next notification + * and status, skipping the forwarded automatic notifications + * if any. + */ assert(next_method); next_return = next_method(priv_iterator); - if (next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + if (next_return.status < 0) { + status = next_return.status; + goto end; + } + + switch (next_return.status) { + case BT_NOTIFICATION_ITERATOR_STATUS_END: + ret = handle_end(iterator); + if (ret) { + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + goto end; + } + + if (iterator->queue->length == 0) { + status = BT_NOTIFICATION_ITERATOR_STATUS_END; + } + + iterator->is_ended = true; + break; + case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: + status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + break; + case BT_NOTIFICATION_ITERATOR_STATUS_OK: if (!next_return.notification) { status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } - BT_MOVE(iterator->current_notification, + /* + * We know the notification is valid. Before we push it + * to the head of the queue, push the appropriate + * automatic notifications if any. + */ + ret = enqueue_notification_and_automatic(iterator, next_return.notification); - bt_notification_freeze(iterator->current_notification); + BT_PUT(next_return.notification); + if (ret) { + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + goto end; + } + break; + default: + /* Unknown non-error status */ + assert(false); } end: - return next_return.status; + return status; +} + +enum bt_notification_iterator_status +bt_notification_iterator_next(struct bt_notification_iterator *iterator) +{ + enum bt_notification_iterator_status status; + + if (!iterator) { + status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; + goto end; + } + + /* + * Make sure that the iterator's queue contains at least one + * notification. + */ + status = ensure_queue_has_notifications(iterator); + if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) { + goto end; + } + + /* + * Move the notification at the tail of the queue to the + * iterator's current notification. + */ + assert(iterator->queue->length > 0); + bt_put(iterator->current_notification); + iterator->current_notification = g_queue_pop_tail(iterator->queue); + assert(iterator->current_notification); + +end: + return status; } struct bt_component *bt_notification_iterator_get_component( struct bt_notification_iterator *iterator) { - return bt_get(iterator->component); + return bt_get(iterator->upstream_component); } struct bt_private_component *