#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
#include "logging/comp-logging.h"
-#include <glib.h>
#include <inttypes.h>
#include <stdbool.h>
#include <unistd.h>
+#include <glib.h>
+
#include "common/assert.h"
#include <babeltrace2/babeltrace.h>
#include "compat/compiler.h"
case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
default:
- abort();
+ bt_common_abort();
}
}
trace = g_new0(struct lttng_live_trace, 1);
if (!trace) {
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Failed to allocate live trace");
goto error;
}
trace->log_level = session->log_level;
session = g_new0(struct lttng_live_session, 1);
if (!session) {
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Failed to allocate live session");
goto error;
}
g_ptr_array_add(lttng_live_msg_iter->sessions, session);
goto end;
error:
- BT_COMP_LOGE("Error adding session");
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error adding session");
g_free(session);
ret = -1;
end:
g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
}
- BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
+ if (lttng_live_msg_iter->viewer_connection) {
+ live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
+ }
BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
/* Invalid state. */
BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
- abort();
+ bt_common_abort();
case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
/* Invalid state. */
BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
- abort();
+ bt_common_abort();
case LTTNG_LIVE_STREAM_EOF:
break;
}
struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_session *session)
{
+ bt_logging_level log_level = lttng_live_msg_iter->log_level;
+ bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
enum lttng_live_iterator_status status;
uint64_t trace_idx;
- int ret = 0;
if (!session->attached) {
- ret = lttng_live_attach_session(session);
- if (ret) {
+ enum lttng_live_attach_session_status attach_status =
+ lttng_live_attach_session(session);
+ if (attach_status != LTTNG_LIVE_ATTACH_SESSION_STATUS_OK) {
if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
+ /*
+ * Clear any causes appended in
+ * `lttng_live_attach_session()` as we want to
+ * return gracefully since the graph was
+ * cancelled.
+ */
+ bt_current_thread_clear_error();
status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Error attaching to LTTng live session");
}
goto end;
}
status != LTTNG_LIVE_ITERATOR_STATUS_END) {
goto end;
}
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
+ trace_idx = 0;
+ while (trace_idx < session->traces->len) {
struct lttng_live_trace *trace =
g_ptr_array_index(session->traces, trace_idx);
status = lttng_live_metadata_update(trace);
- if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
- status != LTTNG_LIVE_ITERATOR_STATUS_END) {
+ switch (status) {
+ case LTTNG_LIVE_ITERATOR_STATUS_OK:
+ trace_idx++;
+ break;
+ case LTTNG_LIVE_ITERATOR_STATUS_END:
+ /*
+ * The trace has ended. Remove it of the array an
+ * continue the iteration.
+ * We can remove the trace safely when using the
+ * g_ptr_array_remove_index_fast because it replaces
+ * the element at trace_idx with the array's last
+ * element. trace_idx is not incremented because of
+ * that.
+ */
+ (void) g_ptr_array_remove_index_fast(session->traces,
+ trace_idx);
+ break;
+ default:
goto end;
}
}
struct lttng_live_msg_iter *lttng_live_msg_iter)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ bt_logging_level log_level = lttng_live_msg_iter->log_level;
+ bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
uint64_t session_idx = 0, nr_sessions_opened = 0;
struct lttng_live_session *session;
enum session_not_found_action sess_not_found_act =
*/
if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Error creating LTTng live viewer session");
goto end;
}
}
bt_message **message, uint64_t timestamp)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ bt_logging_level log_level = lttng_live_msg_iter->log_level;
+ bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
bt_message *msg = NULL;
BT_ASSERT(stream_iter->trace->clock_class);
lttng_live_msg_iter->self_msg_iter,
stream_iter->trace->clock_class, timestamp);
if (!msg) {
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Error emitting message iterator inactivity message");
goto error;
}
ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
if (ret) {
- BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Cannot get nanoseconds from Epoch of clock snapshot: "
"clock-snapshot-addr=%p", clock_snapshot);
goto error;
}
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
bt_logging_level log_level = lttng_live_msg_iter->log_level;
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
- enum bt_msg_iter_status status;
+ enum ctf_msg_iter_status status;
uint64_t session_idx, trace_idx;
for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
goto end;
}
- status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
+ status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter,
lttng_live_msg_iter->self_msg_iter, message);
switch (status) {
- case BT_MSG_ITER_STATUS_EOF:
+ case CTF_MSG_ITER_STATUS_EOF:
ret = LTTNG_LIVE_ITERATOR_STATUS_END;
break;
- case BT_MSG_ITER_STATUS_OK:
+ case CTF_MSG_ITER_STATUS_OK:
ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
- case BT_MSG_ITER_STATUS_AGAIN:
+ case CTF_MSG_ITER_STATUS_AGAIN:
/*
* Continue immediately (end of packet). The next
* get_index may return AGAIN to delay the following
*/
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
break;
- case BT_MSG_ITER_STATUS_INVAL:
+ case CTF_MSG_ITER_STATUS_INVAL:
/* No argument provided by the user, so don't return INVAL. */
- case BT_MSG_ITER_STATUS_ERROR:
+ case CTF_MSG_ITER_STATUS_ERROR:
default:
ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- BT_COMP_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
- lttng_live_stream->msg_iter);
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "CTF message iterator return an error or failed: "
+ "msg_iter=%p", lttng_live_stream->msg_iter);
break;
}
{
enum lttng_live_iterator_status live_status =
LTTNG_LIVE_ITERATOR_STATUS_OK;
+ bt_logging_level log_level = lttng_live_msg_iter->log_level;
+ bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
/*
* The viewer has hung up on us so we are closing the stream. The
- * `bt_msg_iter` should simply realize that it needs to close the
+ * `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
- enum bt_msg_iter_status status = bt_msg_iter_get_next_message(
+ enum ctf_msg_iter_status status = ctf_msg_iter_get_next_message(
stream_iter->msg_iter, lttng_live_msg_iter->self_msg_iter,
curr_msg);
- if (status == BT_MSG_ITER_STATUS_ERROR) {
+ if (status == CTF_MSG_ITER_STATUS_ERROR) {
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Error getting the next message from CTF message iterator");
live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
}
- BT_ASSERT(status == BT_MSG_ITER_STATUS_OK);
+ BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
end:
return live_status;
* We received a message in the past. To ensure
* monotonicity, we can't send it forward.
*/
- BT_COMP_LOGE("Message's timestamp is less than "
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Message's timestamp is less than "
"lttng-live's message iterator's last "
"returned timestamp: "
"lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
put_messages(msgs, *count);
break;
default:
- abort();
+ bt_common_abort();
}
no_session:
lttng_live_msg_iter->viewer_connection =
live_viewer_connection_create(lttng_live->params.url->str, false,
- lttng_live_msg_iter, log_level);
+ lttng_live_msg_iter, self_comp, NULL, log_level);
if (!lttng_live_msg_iter->viewer_connection) {
goto error;
}
lttng_live->params.url->str);
break;
case SESSION_NOT_FOUND_ACTION_FAIL:
- BT_COMP_LOGE("Unable to connect to the requested live viewer "
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unable to connect to the requested live viewer "
"session. Fail the message iterator"
"initialization because of %s=\"%s\" "
"component parameter: url =\"%s\"",
lttng_live->params.url->str);
break;
default:
- abort();
+ bt_common_abort();
}
}
goto error;
} else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
- BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error);
+ BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s",
+ validate_error);
goto error;
}
url = bt_value_string_get(url_value);
viewer_connection = live_viewer_connection_create(url, true, NULL,
- log_level);
+ NULL, self_comp_class, log_level);
if (!viewer_connection) {
+ BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+ "Failed to create viewer connection");
status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
goto error;
}
status = live_viewer_connection_list_sessions(viewer_connection,
result);
if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
+ BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+ "Failed to list viewer sessions");
goto error;
}
static
bt_component_class_query_method_status lttng_live_query_support_info(
const bt_value *params, const bt_value **result,
+ bt_self_component_class *self_comp_class,
bt_logging_level log_level)
{
bt_component_class_query_method_status status =
input_type_value = bt_value_map_borrow_entry_value_const(params,
"type");
if (!input_type_value) {
- BT_COMP_LOGE("Missing expected `type` parameter.");
+ BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+ "Missing expected `type` parameter.");
goto error;
}
if (!bt_value_is_string(input_type_value)) {
- BT_COMP_LOGE("`type` parameter is not a string value.");
+ BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+ "`type` parameter is not a string value.");
goto error;
}
input_value = bt_value_map_borrow_entry_value_const(params, "input");
if (!input_value) {
- BT_COMP_LOGE("Missing expected `input` parameter.");
+ BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+ "Missing expected `input` parameter.");
goto error;
}
if (!bt_value_is_string(input_value)) {
- BT_COMP_LOGE("`input` parameter is not a string value.");
+ BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+ "`input` parameter is not a string value.");
goto error;
}
self_comp_class, log_level);
} else if (strcmp(object, "babeltrace.support-info") == 0) {
status = lttng_live_query_support_info(params, result,
- log_level);
+ self_comp_class, log_level);
} else {
BT_COMP_LOGI("Unknown query object `%s`", object);
status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;