uint64_t trace_idx;
if (!session->attached) {
- enum lttng_live_attach_session_status attach_status =
+ enum lttng_live_viewer_status attach_status =
lttng_live_attach_session(session,
lttng_live_msg_iter->self_msg_iter);
- if (attach_status != LTTNG_LIVE_ATTACH_SESSION_STATUS_OK) {
+ if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
/*
* Clear any causes appended in
lttng_live_iterator_handle_new_streams_and_metadata(
struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status status;
+ enum lttng_live_viewer_status viewer_status;
bt_logging_level log_level = lttng_live_msg_iter->log_level;
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
uint64_t session_idx = 0, nr_sessions_opened = 0;
*/
if (lttng_live_msg_iter->sessions->len == 0) {
if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
- ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+ status = LTTNG_LIVE_ITERATOR_STATUS_END;
goto end;
} else {
/*
* Retry to create a viewer session for the requested
* session name.
*/
- if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
- ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- BT_COMP_LOGE_APPEND_CAUSE(self_comp,
- "Error creating LTTng live viewer session");
+ viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+ if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+ if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+ status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Error creating LTTng live viewer session");
+ } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+ status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ bt_common_abort();
+ }
goto end;
}
}
session_idx++) {
session = g_ptr_array_index(lttng_live_msg_iter->sessions,
session_idx);
- ret = lttng_live_get_session(lttng_live_msg_iter, session);
- switch (ret) {
+ status = lttng_live_get_session(lttng_live_msg_iter, session);
+ switch (status) {
case LTTNG_LIVE_ITERATOR_STATUS_OK:
break;
case LTTNG_LIVE_ITERATOR_STATUS_END:
- ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ status = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
default:
goto end;
nr_sessions_opened++;
}
}
-end:
- if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
- sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
+
+ if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
nr_sessions_opened == 0) {
- ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+ status = LTTNG_LIVE_ITERATOR_STATUS_END;
+ } else {
+ status = LTTNG_LIVE_ITERATOR_STATUS_OK;
}
- return ret;
+
+end:
+ return status;
}
static
uint64_t *count)
{
bt_component_class_message_iterator_next_method_status status;
+ enum lttng_live_viewer_status viewer_status;
struct lttng_live_msg_iter *lttng_live_msg_iter =
bt_self_message_iterator_get_data(self_msg_it);
struct lttng_live_component *lttng_live =
BT_ASSERT_DBG(lttng_live_msg_iter);
+ if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
+ /*
+ * The iterator was interrupted in a previous call to the
+ * `_next()` method. We currently do not support generating
+ * messages after such event. The babeltrace2 CLI should never
+ * be running the graph after being interrupted. So this check
+ * is to prevent other graph users from using this live
+ * iterator in an messed up internal state.
+ */
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
+ goto end;
+ }
+
/*
* Clear all the invalid message reference that might be left over in
* the output array.
if (lttng_live->params.sess_not_found_act !=
SESSION_NOT_FOUND_ACTION_CONTINUE) {
status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
- goto no_session;
+ goto end;
} else {
/*
* The are no more active session for this session
* name. Retry to create a viewer session for the
* requested session name.
*/
- if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
- goto no_session;
+ viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+ if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+ if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Error creating LTTng live viewer session");
+ } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN;
+ } else {
+ bt_common_abort();
+ }
+ goto end;
}
}
}
}
if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- goto end;
+ goto return_status;
}
if (G_UNLIKELY(youngest_stream_iter == NULL) ||
if (!youngest_stream_iter) {
stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
- goto end;
+ goto return_status;
}
BT_ASSERT_DBG(youngest_stream_iter->current_msg);
stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
}
-end:
+
+return_status:
switch (stream_iter_status) {
case LTTNG_LIVE_ITERATOR_STATUS_OK:
case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ /*
+ * If we gathered messages, return _OK even if the graph was
+ * interrupted. This allows for the components downstream to at
+ * least get the thoses messages. If the graph was indeed
+ * interrupted there should not be another _next() call as the
+ * application will tear down the graph. This component class
+ * doesn't support restarting after an interruption.
+ */
if (*count > 0) {
- /*
- * We received a again status but we have some messages
- * to send downstream. We send them and return OK for
- * now. On the next call we return again if there are
- * still no new message to send.
- */
status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
} else {
status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN;
bt_common_abort();
}
-no_session:
+end:
return status;
}
bt_self_component_source *self_comp_src,
bt_self_component_port_output *self_port)
{
- bt_component_class_message_iterator_initialize_method_status ret =
- BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
+ bt_component_class_message_iterator_initialize_method_status status;
bt_self_component *self_comp =
bt_self_component_source_as_self_component(self_comp_src);
struct lttng_live_component *lttng_live;
struct lttng_live_msg_iter *lttng_live_msg_iter;
+ enum lttng_live_viewer_status viewer_status;
bt_logging_level log_level;
BT_ASSERT(self_msg_it);
lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
if (!lttng_live_msg_iter) {
- ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Failed to allocate lttng_live_msg_iter");
goto end;
}
lttng_live_msg_iter->active_stream_iter = 0;
lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
+ lttng_live_msg_iter->was_interrupted = false;
+
lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
(GDestroyNotify) lttng_live_destroy_session);
BT_ASSERT(lttng_live_msg_iter->sessions);
- lttng_live_msg_iter->viewer_connection =
- live_viewer_connection_create(lttng_live->params.url->str, false,
- lttng_live_msg_iter, self_comp, NULL, log_level);
- if (!lttng_live_msg_iter->viewer_connection) {
+ viewer_status = live_viewer_connection_create(self_comp, NULL,
+ log_level, lttng_live->params.url->str, false,
+ lttng_live_msg_iter, <tng_live_msg_iter->viewer_connection);
+ if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+ if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Failed to create viewer connection");
+ } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+ /*
+ * Interruption in the _iter_init() method is not
+ * supported. Return an error.
+ */
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Interrupted while creating viewer connection");
+ }
goto error;
}
- if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+ viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+ if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+ if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Failed to create viewer session");
+ } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+ /*
+ * Interruption in the _iter_init() method is not
+ * supported. Return an error.
+ */
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Interrupted when creating viewer session");
+ }
goto error;
}
+
if (lttng_live_msg_iter->sessions->len == 0) {
switch (lttng_live->params.sess_not_found_act) {
case SESSION_NOT_FOUND_ACTION_CONTINUE:
- BT_COMP_LOGI("Unable to connect to the requested live viewer "
- "session. Keep trying to connect because of "
+ BT_COMP_LOGI("Unable to connect to the requested live viewer session. Keep trying to connect because of "
"%s=\"%s\" component parameter: url=\"%s\"",
SESS_NOT_FOUND_ACTION_PARAM,
SESS_NOT_FOUND_ACTION_CONTINUE_STR,
lttng_live->params.url->str);
break;
case SESSION_NOT_FOUND_ACTION_FAIL:
- BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unable to connect to the requested live viewer "
- "session. Fail the message iterator"
- "initialization because of %s=\"%s\" "
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
"component parameter: url =\"%s\"",
SESS_NOT_FOUND_ACTION_PARAM,
SESS_NOT_FOUND_ACTION_FAIL_STR,
lttng_live->params.url->str);
goto error;
case SESSION_NOT_FOUND_ACTION_END:
- BT_COMP_LOGI("Unable to connect to the requested live viewer "
- "session. End gracefully at the first _next() "
+ BT_COMP_LOGI("Unable to connect to the requested live viewer session. End gracefully at the first _next() "
"call because of %s=\"%s\" component parameter: "
"url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
SESS_NOT_FOUND_ACTION_END_STR,
}
bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
-
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
goto end;
+
error:
- ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
lttng_live_msg_iter_destroy(lttng_live_msg_iter);
end:
- return ret;
+ return status;
}
static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
const bt_value *url_value = NULL;
const char *url;
struct live_viewer_connection *viewer_connection = NULL;
+ enum lttng_live_viewer_status viewer_status;
enum bt_param_validation_status validation_status;
gchar *validate_error = NULL;
url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
url = bt_value_string_get(url_value);
- viewer_connection = live_viewer_connection_create(url, true, NULL,
- NULL, self_comp_class, log_level);
- if (!viewer_connection) {
- BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
- "Failed to create viewer connection");
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+ viewer_status = live_viewer_connection_create(NULL, self_comp_class,
+ log_level, url, true, NULL, &viewer_connection);
+ if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+ if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+ BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+ "Failed to create viewer connection");
+ status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+ } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+ status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
+ } else {
+ bt_common_abort();
+ }
goto error;
}