From d0fea13089e4ea4825826b1022ff0d8110ef2898 Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Tue, 18 Dec 2018 17:16:33 -0500 Subject: [PATCH] lib: do not allow port to be removed when message iterators are active This patch makes it illegal to remove a port when at least one message iterator using it is not finalized (or in the process of being finalized). This introduces the following advantages: * From a downstream point of view, your upstream message iterator cannot randomly get canceled, because a connection cannot end while any message iterator is active. Therefore the `BT_MESSAGE_ITERATOR_STATUS_CANCELED` status is removed. This status was ambiguous and there was no clear indication of what to do when you get it. Current plugins were handling it like `BT_MESSAGE_ITERATOR_STATUS_END`. * There's a precondition that you when you call bt_self_component_port_input_message_iterator_next(), the iterator must be active. In practice this is always the case because the only way for it to be finalized if when you actively put its last reference, and you should know it. Therefore bt_self_component_port_input_message_iterator_next() does not need to check the "finalized" status every time you call it in case it needs to return the removed `BT_MESSAGE_ITERATOR_STATUS_CANCELED` status. To distinguish between "finalized" and "in the process of being finalized", the (internal) `BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING` is introduced: when a message iterator is finalized, its state is set to `BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING`, then its finalization method is called, then its state is set to `BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED`. You can remove a component's port when all its message iterators have one of those states. Signed-off-by: Philippe Proulx --- .../graph/message-iterator-internal.h | 43 +++---- include/babeltrace/graph/message-iterator.h | 1 - lib/graph/component-class-sink-colander.c | 3 - lib/graph/component.c | 7 +- lib/graph/connection.c | 18 +-- lib/graph/graph.c | 14 +-- lib/graph/iterator.c | 107 +++++++++--------- lib/graph/port.c | 33 +++++- plugins/utils/muxer/muxer.c | 4 +- 9 files changed, 121 insertions(+), 109 deletions(-) diff --git a/include/babeltrace/graph/message-iterator-internal.h b/include/babeltrace/graph/message-iterator-internal.h index bc604249..1b1d9e15 100644 --- a/include/babeltrace/graph/message-iterator-internal.h +++ b/include/babeltrace/graph/message-iterator-internal.h @@ -55,18 +55,14 @@ enum bt_self_component_port_input_message_iterator_state { BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED, /* - * Iterator is finalized, but not at the end yet. This means - * that the "next" method can still return queued messages - * before returning the BT_MESSAGE_ITERATOR_STATUS_CANCELED - * status. + * Iterator is currently being finalized. */ - BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING, /* - * Iterator is finalized and ended: the "next" method always - * returns BT_MESSAGE_ITERATOR_STATUS_CANCELED. + * Iterator is finalized. */ - BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED, }; struct bt_message_iterator { @@ -83,22 +79,15 @@ struct bt_self_component_port_input_message_iterator { struct bt_graph *graph; /* Weak */ /* - * This hash table keeps the state of a stream as viewed by - * this message iterator. This is used to, in developer - * mode: - * - * * Automatically enqueue "stream begin", "packet begin", - * "packet end", and "stream end" messages depending - * on the stream's state and on the next message returned - * by the upstream component. - * - * * Make sure that, once the message iterator has seen a - * "stream end" message for a given stream, no other - * messages which refer to this stream can be delivered - * by this iterator. + * This hash table keeps the state of a stream as viewed by this + * message iterator. This is used, in developer mode, to make + * sure that, once the message iterator has seen a "stream end" + * message for a given stream, no other messages which refer to + * this stream can be delivered by this iterator. It is also + * used to check for a valid sequence of messages. * - * The key (struct bt_stream *) is not owned by this. The - * value is an allocated state structure. + * The key (struct bt_stream *) is not owned by this. The value + * is an allocated state structure. */ GHashTable *stream_states; @@ -119,7 +108,7 @@ struct bt_port_output_message_iterator { }; BT_HIDDEN -void bt_self_component_port_input_message_iterator_finalize( +void bt_self_component_port_input_message_iterator_try_finalize( struct bt_self_component_port_input_message_iterator *iterator); BT_HIDDEN @@ -132,8 +121,6 @@ const char *bt_message_iterator_status_string( enum bt_message_iterator_status status) { switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_CANCELED: - return "BT_MESSAGE_ITERATOR_STATUS_CANCELED"; case BT_MESSAGE_ITERATOR_STATUS_AGAIN: return "BT_MESSAGE_ITERATOR_STATUS_AGAIN"; case BT_MESSAGE_ITERATOR_STATUS_END: @@ -158,10 +145,10 @@ const char *bt_self_component_port_input_message_iterator_state_string( return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE"; case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED: return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED"; + case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING: + return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING"; case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED: return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED"; - case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED: - return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED"; default: return "(unknown)"; } diff --git a/include/babeltrace/graph/message-iterator.h b/include/babeltrace/graph/message-iterator.h index d26cc768..acb93f8f 100644 --- a/include/babeltrace/graph/message-iterator.h +++ b/include/babeltrace/graph/message-iterator.h @@ -35,7 +35,6 @@ typedef enum bt_message_iterator_status { BT_MESSAGE_ITERATOR_STATUS_OK = 0, BT_MESSAGE_ITERATOR_STATUS_END = 1, BT_MESSAGE_ITERATOR_STATUS_AGAIN = 11, - BT_MESSAGE_ITERATOR_STATUS_CANCELED = 125, BT_MESSAGE_ITERATOR_STATUS_ERROR = -1, BT_MESSAGE_ITERATOR_STATUS_NOMEM = -12, } bt_message_iterator_status; diff --git a/lib/graph/component-class-sink-colander.c b/lib/graph/component-class-sink-colander.c index 36d764e3..1b67b2f8 100644 --- a/lib/graph/component-class-sink-colander.c +++ b/lib/graph/component-class-sink-colander.c @@ -151,9 +151,6 @@ enum bt_self_component_status colander_consume( colander_data->msg_iter, &msgs, colander_data->count_addr); switch (msg_iter_status) { - case BT_MESSAGE_ITERATOR_STATUS_CANCELED: - status = BT_SELF_COMPONENT_STATUS_OK; - goto end; case BT_MESSAGE_ITERATOR_STATUS_AGAIN: status = BT_SELF_COMPONENT_STATUS_AGAIN; goto end; diff --git a/lib/graph/component.c b/lib/graph/component.c index 13c2aa0d..84193127 100644 --- a/lib/graph/component.c +++ b/lib/graph/component.c @@ -267,7 +267,6 @@ int bt_component_create(struct bt_component_class *component_class, BT_ASSERT(user_component); BT_ASSERT(component_class); BT_ASSERT(name); - type = bt_component_class_get_type(component_class); BT_LIB_LOGD("Creating empty component from component class: %![cc-]+C, " "comp-name=\"%s\"", component_class, name); @@ -278,8 +277,7 @@ int bt_component_create(struct bt_component_class *component_class, goto end; } - bt_object_init_shared_with_parent(&component->base, - destroy_component); + bt_object_init_shared_with_parent(&component->base, destroy_component); component->class = component_class; bt_object_get_no_null_check(component->class); component->destroy = component_destroy_funcs[type]; @@ -547,8 +545,7 @@ void bt_component_remove_port(struct bt_component *component, struct bt_port *cur_port = g_ptr_array_index(ports, i); if (cur_port == port) { - remove_port_by_index(component, - ports, i); + remove_port_by_index(component, ports, i); goto end; } } diff --git a/lib/graph/connection.c b/lib/graph/connection.c index 5d074913..d490c5dc 100644 --- a/lib/graph/connection.c +++ b/lib/graph/connection.c @@ -46,10 +46,10 @@ void destroy_connection(struct bt_object *obj) BT_LIB_LOGD("Destroying connection: %!+x", connection); /* - * Make sure that each message iterator which was created - * for this connection is finalized before we destroy it. Once a - * message iterator is finalized, all its method return - * NULL or the BT_MESSAGE_ITERATOR_STATUS_CANCELED status. + * Make sure that each message iterator which was created for + * this connection is finalized before we destroy it. Once a + * message iterator is finalized, all its method return NULL or + * the BT_MESSAGE_ITERATOR_STATUS_CANCELED status. * * Because connections are destroyed before components within a * graph, this ensures that message iterators are always @@ -227,8 +227,12 @@ void bt_connection_end(struct bt_connection *conn, bool try_remove_from_graph) bt_object_put_ref(upstream_port); /* - * Because this connection is ended, finalize (cancel) each - * message iterator created from it. + * Because this connection is ended, finalize each message + * iterator created from it. + * + * In practice, this only happens when the connection is + * destroyed and not all its message iterators were finalized, + * which is on graph destruction. */ for (i = 0; i < conn->iterators->len; i++) { struct bt_self_component_port_input_message_iterator *iterator = @@ -236,7 +240,7 @@ void bt_connection_end(struct bt_connection *conn, bool try_remove_from_graph) BT_LIB_LOGD("Finalizing message iterator created by " "this ended connection: %![iter-]+i", iterator); - bt_self_component_port_input_message_iterator_finalize( + bt_self_component_port_input_message_iterator_try_finalize( iterator); /* diff --git a/lib/graph/graph.c b/lib/graph/graph.c index 906c139e..f2895418 100644 --- a/lib/graph/graph.c +++ b/lib/graph/graph.c @@ -118,11 +118,11 @@ void destroy_graph(struct bt_object *obj) * in this situation: * * 1. We put and destroy a connection. - * 2. This connection's destructor finalizes its active - * message iterators. - * 3. A message iterator's finalization function gets a - * new reference on its component (reference count goes from - * 0 to 1). + * 2. This connection's destructor finalizes its active message + * iterators. + * 3. A message iterator's finalization function gets a new + * reference on its component (reference count goes from 0 to + * 1). * 4. Since this component's reference count goes to 1, it takes * a reference on its parent (this graph). This graph's * reference count goes from 0 to 1. @@ -131,8 +131,8 @@ void destroy_graph(struct bt_object *obj) * 6. Since this component's reference count goes from 1 to 0, * it puts its parent (this graph). This graph's reference * count goes from 1 to 0. - * 7. Since this graph's reference count goes from 1 to 0, - * its destructor is called (this function). + * 7. Since this graph's reference count goes from 1 to 0, its + * destructor is called (this function). * * With the incrementation below, the graph's reference count at * step 4 goes from 1 to 2, and from 2 to 1 at step 6. This diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index 2c2fcadb..95429fb5 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -115,6 +115,24 @@ end: return stream_state; } +static inline +void _set_self_comp_port_input_msg_iterator_state( + struct bt_self_component_port_input_message_iterator *iterator, + enum bt_self_component_port_input_message_iterator_state state) +{ + BT_ASSERT(iterator); + BT_LIB_LOGD("Updating message iterator's state: " + "new-state=%s", + bt_self_component_port_input_message_iterator_state_string(state)); + iterator->state = state; +} + +#ifdef BT_DEV_MODE +# define set_self_comp_port_input_msg_iterator_state _set_self_comp_port_input_msg_iterator_state +#else +# define set_self_comp_port_input_msg_iterator_state(_a, _b) +#endif + static void destroy_base_message_iterator(struct bt_object *obj) { @@ -152,7 +170,7 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj iterator = (void *) obj; BT_LIB_LOGD("Destroying self component input port message iterator object: " "%!+i", iterator); - bt_self_component_port_input_message_iterator_finalize(iterator); + bt_self_component_port_input_message_iterator_try_finalize(iterator); if (iterator->stream_states) { /* @@ -179,7 +197,7 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj } BT_HIDDEN -void bt_self_component_port_input_message_iterator_finalize( +void bt_self_component_port_input_message_iterator_try_finalize( struct bt_self_component_port_input_message_iterator *iterator) { typedef void (*method_t)(void *); @@ -194,29 +212,24 @@ void bt_self_component_port_input_message_iterator_finalize( /* Skip user finalization if user initialization failed */ BT_LIB_LOGD("Not finalizing non-initialized message iterator: " "%!+i", iterator); - return; + goto end; case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED: - case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED: /* Already finalized */ BT_LIB_LOGD("Not finalizing message iterator: already finalized: " "%!+i", iterator); - return; + goto end; + case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING: + /* Already finalized */ + BT_LIB_LOGF("Message iterator is already being finalized: " + "%!+i", iterator); + abort(); default: break; } BT_LIB_LOGD("Finalizing message iterator: %!+i", iterator); - - if (iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED) { - BT_LIB_LOGD("Updating message iterator's state: " - "new-state=BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED"); - iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED; - } else { - BT_LIB_LOGD("Updating message iterator's state: " - "new-state=BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED"); - iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED; - } - + set_self_comp_port_input_msg_iterator_state(iterator, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING); BT_ASSERT(iterator->upstream_component); comp_class = iterator->upstream_component->class; @@ -251,7 +264,12 @@ void bt_self_component_port_input_message_iterator_finalize( iterator->upstream_component = NULL; iterator->upstream_port = NULL; + set_self_comp_port_input_msg_iterator_state(iterator, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED); BT_LIB_LOGD("Finalized message iterator: %!+i", iterator); + +end: + return; } BT_HIDDEN @@ -334,7 +352,8 @@ bt_self_component_port_input_message_iterator_create_initial( iterator->upstream_port = upstream_port; iterator->connection = iterator->upstream_port->connection; iterator->graph = bt_component_borrow_graph(upstream_comp); - iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED; + set_self_comp_port_input_msg_iterator_state(iterator, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED); BT_LIB_LOGD("Created initial message iterator on self component input port: " "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", upstream_port, upstream_comp, iterator); @@ -420,11 +439,13 @@ bt_self_component_port_input_message_iterator_create( bt_message_iterator_status_string(iter_status)); if (iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) { BT_LOGW_STR("Initialization method failed."); + BT_OBJECT_PUT_REF_AND_RESET(iterator); goto end; } } - iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE; + set_self_comp_port_input_msg_iterator_state(iterator, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); g_ptr_array_add(port->connection->iterators, iterator); BT_LIB_LOGD("Created message iterator on self component input port: " "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", @@ -762,8 +783,7 @@ bt_self_component_port_input_message_iterator_next( */ BT_ASSERT(method); BT_LOGD_STR("Calling user's \"next\" method."); - status = method(iterator, - (void *) iterator->base.msgs->pdata, + status = method(iterator, (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE, user_count); BT_LOGD("User method returned: status=%s", bt_message_iterator_status_string(status)); @@ -772,33 +792,16 @@ bt_self_component_port_input_message_iterator_next( goto end; } - if (iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED || - iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED) { - /* - * The user's "next" method, somehow, cancelled its own - * message iterator. This can happen, for example, - * when the user's method removes the port on which - * there's the connection from which the iterator was - * created. In this case, said connection is ended, and - * all its message iterators are finalized. - * - * Only bt_object_put_ref() the returned message if - * the status is BT_MESSAGE_ITERATOR_STATUS_OK - * because otherwise this field could be garbage. - */ - if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { - uint64_t i; - bt_message_array_const msgs = - (void *) iterator->base.msgs->pdata; - - for (i = 0; i < *user_count; i++) { - bt_object_put_ref(msgs[i]); - } - } - - status = BT_MESSAGE_ITERATOR_STATUS_CANCELED; - goto end; - } +#ifdef BT_DEV_MODE + /* + * There is no way that this iterator could have been finalized + * during its "next" method, as the only way to do this is to + * put the last iterator's reference, and this can only be done + * by its downstream owner. + */ + BT_ASSERT(iterator->state == + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); +#endif switch (status) { case BT_MESSAGE_ITERATOR_STATUS_OK: @@ -814,11 +817,8 @@ bt_self_component_port_input_message_iterator_next( BT_ASSERT_PRE(self_comp_port_input_msg_iter_can_end(iterator), "Message iterator cannot end at this point: " "%!+i", iterator); - BT_ASSERT(iterator->state == - BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); - iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED; - BT_LOGD("Set new status: status=%s", - bt_message_iterator_status_string(status)); + set_self_comp_port_input_msg_iterator_state(iterator, + BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED); goto end; default: /* Unknown non-error status */ @@ -829,8 +829,7 @@ end: return status; } -enum bt_message_iterator_status -bt_port_output_message_iterator_next( +enum bt_message_iterator_status bt_port_output_message_iterator_next( struct bt_port_output_message_iterator *iterator, bt_message_array_const *msgs_to_user, uint64_t *count_to_user) diff --git a/lib/graph/port.c b/lib/graph/port.c index da83725e..86d01b92 100644 --- a/lib/graph/port.c +++ b/lib/graph/port.c @@ -137,6 +137,36 @@ void bt_port_set_connection(struct bt_port *port, connection); } +static inline +bool port_connection_iterators_are_finalized(struct bt_port *port) +{ + bool ret = true; + struct bt_connection *conn = port->connection; + uint64_t i; + + if (!conn) { + goto end; + } + + for (i = 0; i < conn->iterators->len; i++) { + struct bt_self_component_port_input_message_iterator *iterator = + conn->iterators->pdata[i]; + + BT_ASSERT(iterator); + + if (iterator->state != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING && + iterator->state != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED) { + BT_ASSERT_PRE_MSG("Message iterator is not being finalized or finalized: " + "%!+i", iterator); + ret = false; + goto end; + } + } + +end: + return ret; +} + enum bt_self_component_port_status bt_self_component_port_remove_from_component( struct bt_self_component_port *self_port) { @@ -144,7 +174,8 @@ enum bt_self_component_port_status bt_self_component_port_remove_from_component( struct bt_component *comp = NULL; BT_ASSERT_PRE_NON_NULL(port, "Port"); - + BT_ASSERT_PRE(port_connection_iterators_are_finalized(port), + "At least one message iterator using this port has the wrong state."); comp = (void *) bt_object_borrow_parent(&port->base); if (!comp) { BT_LIB_LOGV("Port already removed from its component: %!+p", diff --git a/plugins/utils/muxer/muxer.c b/plugins/utils/muxer/muxer.c index cc6dc460..8270e14b 100644 --- a/plugins/utils/muxer/muxer.c +++ b/plugins/utils/muxer/muxer.c @@ -492,7 +492,6 @@ bt_message_iterator_status muxer_upstream_msg_iter_next( */ break; case BT_MESSAGE_ITERATOR_STATUS_END: /* Fall-through. */ - case BT_MESSAGE_ITERATOR_STATUS_CANCELED: /* * Message iterator reached the end: release it. It * won't be considered again to find the youngest @@ -1056,8 +1055,7 @@ bt_message_iterator_status muxer_msg_iter_do_next_one( status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, muxer_msg_iter, &muxer_upstream_msg_iter, &next_return_ts); - if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END || - status == BT_MESSAGE_ITERATOR_STATUS_CANCELED) { + if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END) { if (status < 0) { BT_LOGE("Cannot find the youngest upstream message iterator wrapper: " "status=%s", -- 2.34.1