X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=e36cdc662088f02cfede0772ed085878e0ba4869;hb=4f74db527bee45bfa24cd5812b840d5e12aff05f;hp=c28af3d5140c343f9cb0d83993b924a1c07aceda;hpb=498e7994d60bd0e9f63c3d5c0fd00eec77ba7c34;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index c28af3d5..e36cdc66 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -258,7 +258,6 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, g_ptr_array_add(lttng_live_msg_iter->sessions, session); goto end; error: - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error adding session"); g_free(session); ret = -1; end: @@ -448,9 +447,10 @@ enum lttng_live_iterator_status lttng_live_get_session( uint64_t trace_idx; if (!session->attached) { - enum lttng_live_attach_session_status attach_status = - lttng_live_attach_session(session); - if (attach_status != LTTNG_LIVE_ATTACH_SESSION_STATUS_OK) { + enum lttng_live_viewer_status attach_status = + lttng_live_attach_session(session, + lttng_live_msg_iter->self_msg_iter); + if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { /* * Clear any causes appended in @@ -469,7 +469,8 @@ enum lttng_live_iterator_status lttng_live_get_session( } } - status = lttng_live_get_new_streams(session); + status = lttng_live_get_new_streams(session, + lttng_live_msg_iter->self_msg_iter); if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && status != LTTNG_LIVE_ITERATOR_STATUS_END) { goto end; @@ -501,7 +502,8 @@ enum lttng_live_iterator_status lttng_live_get_session( goto end; } } - status = lttng_live_lazy_msg_init(session); + status = lttng_live_lazy_msg_init(session, + lttng_live_msg_iter->self_msg_iter); end: return status; @@ -544,7 +546,8 @@ enum lttng_live_iterator_status lttng_live_iterator_handle_new_streams_and_metadata( struct lttng_live_msg_iter *lttng_live_msg_iter) { - enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status status; + enum lttng_live_viewer_status viewer_status; bt_logging_level log_level = lttng_live_msg_iter->log_level; bt_self_component *self_comp = lttng_live_msg_iter->self_comp; uint64_t session_idx = 0, nr_sessions_opened = 0; @@ -560,17 +563,24 @@ lttng_live_iterator_handle_new_streams_and_metadata( */ if (lttng_live_msg_iter->sessions->len == 0) { if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - ret = LTTNG_LIVE_ITERATOR_STATUS_END; + status = LTTNG_LIVE_ITERATOR_STATUS_END; goto end; } else { /* * Retry to create a viewer session for the requested * session name. */ - if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { - ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error creating LTTng live viewer session"); + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error creating LTTng live viewer session"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + bt_common_abort(); + } goto end; } } @@ -580,12 +590,12 @@ lttng_live_iterator_handle_new_streams_and_metadata( session_idx++) { session = g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); - ret = lttng_live_get_session(lttng_live_msg_iter, session); - switch (ret) { + status = lttng_live_get_session(lttng_live_msg_iter, session); + switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_OK: break; case LTTNG_LIVE_ITERATOR_STATUS_END: - ret = LTTNG_LIVE_ITERATOR_STATUS_OK; + status = LTTNG_LIVE_ITERATOR_STATUS_OK; break; default: goto end; @@ -594,13 +604,16 @@ lttng_live_iterator_handle_new_streams_and_metadata( nr_sessions_opened++; } } -end: - if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK && - sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && + + if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) { - ret = LTTNG_LIVE_ITERATOR_STATUS_END; + status = LTTNG_LIVE_ITERATOR_STATUS_END; + } else { + status = LTTNG_LIVE_ITERATOR_STATUS_OK; } - return ret; + +end: + return status; } static @@ -770,7 +783,7 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; bt_logging_level log_level = lttng_live_msg_iter->log_level; bt_self_component *self_comp = lttng_live_msg_iter->self_comp; - enum bt_msg_iter_status status; + enum ctf_msg_iter_status status; uint64_t session_idx, trace_idx; for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; @@ -798,16 +811,16 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ goto end; } - status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter, - lttng_live_msg_iter->self_msg_iter, message); + status = ctf_msg_iter_get_next_message( + lttng_live_stream->msg_iter, message); switch (status) { - case BT_MSG_ITER_STATUS_EOF: + case CTF_MSG_ITER_STATUS_EOF: ret = LTTNG_LIVE_ITERATOR_STATUS_END; break; - case BT_MSG_ITER_STATUS_OK: + case CTF_MSG_ITER_STATUS_OK: ret = LTTNG_LIVE_ITERATOR_STATUS_OK; break; - case BT_MSG_ITER_STATUS_AGAIN: + case CTF_MSG_ITER_STATUS_AGAIN: /* * Continue immediately (end of packet). The next * get_index may return AGAIN to delay the following @@ -815,9 +828,9 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ */ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; break; - case BT_MSG_ITER_STATUS_INVAL: + case CTF_MSG_ITER_STATUS_INVAL: /* No argument provided by the user, so don't return INVAL. */ - case BT_MSG_ITER_STATUS_ERROR: + case CTF_MSG_ITER_STATUS_ERROR: default: ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; BT_COMP_LOGE_APPEND_CAUSE(self_comp, @@ -842,21 +855,20 @@ enum lttng_live_iterator_status lttng_live_iterator_close_stream( bt_self_component *self_comp = lttng_live_msg_iter->self_comp; /* * The viewer has hung up on us so we are closing the stream. The - * `bt_msg_iter` should simply realize that it needs to close the + * `ctf_msg_iter` should simply realize that it needs to close the * stream properly by emitting the necessary stream end message. */ - enum bt_msg_iter_status status = bt_msg_iter_get_next_message( - stream_iter->msg_iter, lttng_live_msg_iter->self_msg_iter, - curr_msg); + enum ctf_msg_iter_status status = ctf_msg_iter_get_next_message( + stream_iter->msg_iter, curr_msg); - if (status == BT_MSG_ITER_STATUS_ERROR) { + if (status == CTF_MSG_ITER_STATUS_ERROR) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error getting the next message from CTF message iterator"); live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; } - BT_ASSERT(status == BT_MSG_ITER_STATUS_OK); + BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK); end: return live_status; @@ -1258,6 +1270,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( uint64_t *count) { bt_component_class_message_iterator_next_method_status status; + enum lttng_live_viewer_status viewer_status; struct lttng_live_msg_iter *lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_it); struct lttng_live_component *lttng_live = @@ -1271,6 +1284,21 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( BT_ASSERT_DBG(lttng_live_msg_iter); + if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) { + /* + * The iterator was interrupted in a previous call to the + * `_next()` method. We currently do not support generating + * messages after such event. The babeltrace2 CLI should never + * be running the graph after being interrupted. So this check + * is to prevent other graph users from using this live + * iterator in an messed up internal state. + */ + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event."); + goto end; + } + /* * Clear all the invalid message reference that might be left over in * the output array. @@ -1286,16 +1314,25 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; - goto no_session; + goto end; } else { /* * The are no more active session for this session * name. Retry to create a viewer session for the * requested session name. */ - if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; - goto no_session; + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error creating LTTng live viewer session"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + } else { + bt_common_abort(); + } + goto end; } } } @@ -1363,7 +1400,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( } if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - goto end; + goto return_status; } if (G_UNLIKELY(youngest_stream_iter == NULL) || @@ -1406,7 +1443,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( if (!youngest_stream_iter) { stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - goto end; + goto return_status; } BT_ASSERT_DBG(youngest_stream_iter->current_msg); @@ -1428,17 +1465,20 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } -end: + +return_status: switch (stream_iter_status) { case LTTNG_LIVE_ITERATOR_STATUS_OK: case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + /* + * If we gathered messages, return _OK even if the graph was + * interrupted. This allows for the components downstream to at + * least get the thoses messages. If the graph was indeed + * interrupted there should not be another _next() call as the + * application will tear down the graph. This component class + * doesn't support restarting after an interruption. + */ if (*count > 0) { - /* - * We received a again status but we have some messages - * to send downstream. We send them and return OK for - * now. On the next call we return again if there are - * still no new message to send. - */ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; } else { status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; @@ -1461,10 +1501,44 @@ end: bt_common_abort(); } -no_session: +end: return status; } +static +struct lttng_live_msg_iter *lttng_live_msg_iter_create( + struct lttng_live_component *lttng_live_comp, + bt_self_message_iterator *self_msg_it) +{ + bt_self_component *self_comp = lttng_live_comp->self_comp; + bt_logging_level log_level = lttng_live_comp->log_level; + + struct lttng_live_msg_iter *lttng_live_msg_iter = + g_new0(struct lttng_live_msg_iter, 1); + if (!lttng_live_msg_iter) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to allocate lttng_live_msg_iter"); + goto end; + } + + lttng_live_msg_iter->log_level = lttng_live_comp->log_level; + lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp; + lttng_live_msg_iter->lttng_live_comp = lttng_live_comp; + lttng_live_msg_iter->self_msg_iter = self_msg_it; + + lttng_live_msg_iter->active_stream_iter = 0; + lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN; + lttng_live_msg_iter->was_interrupted = false; + + lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func( + (GDestroyNotify) lttng_live_destroy_session); + BT_ASSERT(lttng_live_msg_iter->sessions); + +end: + return lttng_live_msg_iter; + +} + BT_HIDDEN bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter_init( bt_self_message_iterator *self_msg_it, @@ -1472,12 +1546,12 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter bt_self_component_source *self_comp_src, bt_self_component_port_output *self_port) { - bt_component_class_message_iterator_initialize_method_status ret = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; + bt_component_class_message_iterator_initialize_method_status status; bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src); struct lttng_live_component *lttng_live; struct lttng_live_msg_iter *lttng_live_msg_iter; + enum lttng_live_viewer_status viewer_status; bt_logging_level log_level; BT_ASSERT(self_msg_it); @@ -1486,59 +1560,73 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter log_level = lttng_live->log_level; self_comp = lttng_live->self_comp; + /* There can be only one downstream iterator at the same time. */ BT_ASSERT(!lttng_live->has_msg_iter); lttng_live->has_msg_iter = true; - lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1); + lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, + self_msg_it); if (!lttng_live_msg_iter) { - ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; - goto end; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to create lttng_live_msg_iter"); + goto error; } - lttng_live_msg_iter->log_level = lttng_live->log_level; - lttng_live_msg_iter->self_comp = lttng_live->self_comp; - lttng_live_msg_iter->lttng_live_comp = lttng_live; - lttng_live_msg_iter->self_msg_iter = self_msg_it; - - lttng_live_msg_iter->active_stream_iter = 0; - lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN; - lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func( - (GDestroyNotify) lttng_live_destroy_session); - BT_ASSERT(lttng_live_msg_iter->sessions); - - lttng_live_msg_iter->viewer_connection = - live_viewer_connection_create(lttng_live->params.url->str, false, - lttng_live_msg_iter, self_comp, NULL, log_level); - if (!lttng_live_msg_iter->viewer_connection) { + viewer_status = live_viewer_connection_create(self_comp, NULL, + log_level, lttng_live->params.url->str, false, + lttng_live_msg_iter, <tng_live_msg_iter->viewer_connection); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to create viewer connection"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + /* + * Interruption in the _iter_init() method is not + * supported. Return an error. + */ + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Interrupted while creating viewer connection"); + } goto error; } - if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { + viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to create viewer session"); + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + /* + * Interruption in the _iter_init() method is not + * supported. Return an error. + */ + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Interrupted when creating viewer session"); + } goto error; } + if (lttng_live_msg_iter->sessions->len == 0) { switch (lttng_live->params.sess_not_found_act) { case SESSION_NOT_FOUND_ACTION_CONTINUE: - BT_COMP_LOGI("Unable to connect to the requested live viewer " - "session. Keep trying to connect because of " + BT_COMP_LOGI("Unable to connect to the requested live viewer session. Keep trying to connect because of " "%s=\"%s\" component parameter: url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR, lttng_live->params.url->str); break; case SESSION_NOT_FOUND_ACTION_FAIL: - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unable to connect to the requested live viewer " - "session. Fail the message iterator" - "initialization because of %s=\"%s\" " + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" " "component parameter: url =\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR, lttng_live->params.url->str); goto error; case SESSION_NOT_FOUND_ACTION_END: - BT_COMP_LOGI("Unable to connect to the requested live viewer " - "session. End gracefully at the first _next() " + BT_COMP_LOGI("Unable to connect to the requested live viewer session. End gracefully at the first _next() " "call because of %s=\"%s\" component parameter: " "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR, @@ -1550,13 +1638,14 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter } bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter); - + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; goto end; + error: - ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; lttng_live_msg_iter_destroy(lttng_live_msg_iter); end: - return ret; + return status; } static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = { @@ -1574,6 +1663,7 @@ bt_component_class_query_method_status lttng_live_query_list_sessions( const bt_value *url_value = NULL; const char *url; struct live_viewer_connection *viewer_connection = NULL; + enum lttng_live_viewer_status viewer_status; enum bt_param_validation_status validation_status; gchar *validate_error = NULL; @@ -1592,12 +1682,18 @@ bt_component_class_query_method_status lttng_live_query_list_sessions( url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); url = bt_value_string_get(url_value); - viewer_connection = live_viewer_connection_create(url, true, NULL, - NULL, self_comp_class, log_level); - if (!viewer_connection) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, - "Failed to create viewer connection"); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + viewer_status = live_viewer_connection_create(NULL, self_comp_class, + log_level, url, true, NULL, &viewer_connection); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Failed to create viewer connection"); + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; + } else { + bt_common_abort(); + } goto error; }