* Babeltrace CTF LTTng-live Client Component
*/
+#include <glib.h>
+#include <inttypes.h>
+#include <unistd.h>
+
+#include <babeltrace2/babeltrace.h>
+
#define BT_COMP_LOG_SELF_COMP self_comp
#define BT_LOG_OUTPUT_LEVEL log_level
#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
#include "logging/comp-logging.h"
-#include <inttypes.h>
-#include <stdbool.h>
-#include <unistd.h>
-
-#include <glib.h>
-
#include "common/assert.h"
-#include <babeltrace2/babeltrace.h>
-#include "compat/compiler.h"
-#include <babeltrace2/types.h>
#include "plugins/common/muxing/muxing.h"
#include "plugins/common/param-validation/param-validation.h"
#include "data-stream.hpp"
-#include "metadata.hpp"
#include "lttng-live.hpp"
+#include "metadata.hpp"
#define MAX_QUERY_SIZE (256 * 1024)
#define URL_PARAM "url"
live_stream_iter->current_inactivity_ts); \
} while (0);
-BT_HIDDEN
bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
{
bool ret;
BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
lttng_live_metadata_fini(trace);
- g_free(trace);
+ delete trace;
}
static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
uint64_t trace_id)
{
- struct lttng_live_trace *trace = NULL;
bt_logging_level log_level = session->log_level;
bt_self_component *self_comp = session->self_comp;
BT_COMP_LOGD("Creating live trace: "
"session-id=%" PRIu64 ", trace-id=%" PRIu64,
session->id, trace_id);
- trace = g_new0(struct lttng_live_trace, 1);
- if (!trace) {
- BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live trace");
- goto error;
- }
+
+ lttng_live_trace *trace = new lttng_live_trace;
trace->log_level = session->log_level;
trace->self_comp = session->self_comp;
trace->session = session;
trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
g_ptr_array_add(session->traces, trace);
- goto end;
-error:
- g_free(trace);
- trace = NULL;
-end:
return trace;
}
-BT_HIDDEN
struct lttng_live_trace *
lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
uint64_t trace_id)
return trace;
}
-BT_HIDDEN
int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
const char *hostname, const char *session_name)
{
- int ret = 0;
- struct lttng_live_session *session;
bt_logging_level log_level = lttng_live_msg_iter->log_level;
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
"session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
session_id, hostname, session_name);
- session = g_new0(struct lttng_live_session, 1);
- if (!session) {
- BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live session");
- goto error;
- }
-
+ lttng_live_session *session = new lttng_live_session;
session->log_level = lttng_live_msg_iter->log_level;
session->self_comp = lttng_live_msg_iter->self_comp;
session->id = session_id;
BT_ASSERT(session->session_name);
g_ptr_array_add(lttng_live_msg_iter->sessions, session);
- goto end;
-error:
- g_free(session);
- ret = -1;
-end:
- return ret;
+
+ return 0;
}
static void lttng_live_destroy_session(struct lttng_live_session *session)
if (session->hostname) {
g_string_free(session->hostname, TRUE);
}
+
if (session->session_name) {
g_string_free(session->session_name, TRUE);
}
- g_free(session);
+
+ delete session;
end:
return;
BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
- g_free(lttng_live_msg_iter);
+ delete lttng_live_msg_iter;
end:
return;
}
-BT_HIDDEN
void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
{
struct lttng_live_msg_iter *lttng_live_msg_iter;
if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
/*
- * Because the stream is in the QUIESCENT_NO_DATA
- * state, we can assert that the last_inactivity_ts was
- * set and can be safely used in the `if` above.
- */
+ * Because the stream is in the QUIESCENT_NO_DATA
+ * state, we can assert that the last_inactivity_ts was
+ * set and can be safely used in the `if` above.
+ */
BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
}
}
- BT_COMP_LOGD("Updating all streams and metadata for session: "
+ BT_COMP_LOGD("Updating all data streams: "
"session-id=%" PRIu64 ", session-name=\"%s\"",
session->id, session->session_name->str);
status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
- if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && status != LTTNG_LIVE_ITERATOR_STATUS_END) {
+ switch (status) {
+ case LTTNG_LIVE_ITERATOR_STATUS_OK:
+ break;
+ case LTTNG_LIVE_ITERATOR_STATUS_END:
+ /*
+ * We received a `_END` from the `_get_new_streams()` function,
+ * which means no more data will ever be received from the data
+ * streams of this session. But it's possible that the metadata
+ * is incomplete.
+ * The live protocol guarantees that we receive all the
+ * metadata needed before we receive data streams needing it.
+ * But it's possible to receive metadata NOT needed by
+ * data streams after the session was closed. For example, this
+ * could happen if a new event is registered and the session is
+ * stopped before any tracepoint for that event is actually
+ * fired.
+ */
+ BT_COMP_LOGD(
+ "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
+ "session-id=%" PRIu64 ", session-name=\"%s\"",
+ session->id, session->session_name->str);
+ status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ default:
goto end;
}
+ BT_COMP_LOGD("Updating metadata stream for session: "
+ "session-id=%" PRIu64 ", session-name=\"%s\"",
+ session->id, session->session_name->str);
+
trace_idx = 0;
while (trace_idx < session->traces->len) {
struct lttng_live_trace *trace =
switch (status) {
case LTTNG_LIVE_ITERATOR_STATUS_OK:
case LTTNG_LIVE_ITERATOR_STATUS_END:
+ /*
+ * A session returned `_END`. Other sessions may still
+ * be active so we override the status and continue
+ * looping if needed.
+ */
break;
default:
goto end;
return ret;
}
-static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
- struct lttng_live_msg_iter *lttng_live_msg_iter,
+static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
{
- const bt_clock_class *clock_class = NULL;
const bt_clock_snapshot *clock_snapshot = NULL;
int ret = 0;
bt_logging_level log_level = lttng_live_msg_iter->log_level;
switch (bt_message_get_type(msg)) {
case BT_MESSAGE_TYPE_EVENT:
- clock_class = bt_message_event_borrow_stream_class_default_clock_class_const(msg);
- BT_ASSERT_DBG(clock_class);
-
clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
break;
case BT_MESSAGE_TYPE_PACKET_BEGINNING:
- clock_class =
- bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(msg);
- BT_ASSERT(clock_class);
-
clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
break;
case BT_MESSAGE_TYPE_PACKET_END:
- clock_class = bt_message_packet_end_borrow_stream_class_default_clock_class_const(msg);
- BT_ASSERT(clock_class);
-
clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
break;
case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
- clock_class =
- bt_message_discarded_events_borrow_stream_class_default_clock_class_const(msg);
- BT_ASSERT(clock_class);
-
clock_snapshot =
bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
break;
case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
- clock_class =
- bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(msg);
- BT_ASSERT(clock_class);
-
clock_snapshot =
bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
break;
goto end;
}
- clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
- BT_ASSERT_DBG(clock_class);
-
ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
if (ret) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
* by this component. We CANNOT send it as is.
*
* The only expected scenario in which that could happen is the
- * following, everything else is a bug in this component, relay deamon,
+ * following, everything else is a bug in this component, relay daemon,
* or CTF parser.
*
* Expected scenario: The CTF message iterator emitted discarded
/*
* Get the timestamp in nanoseconds from origin of this
- * messsage.
+ * message.
*/
- live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, msg,
- lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
+ live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
+ &curr_msg_ts_ns);
/*
* Check if the message of the current live stream
}
}
-BT_HIDDEN
bt_message_iterator_class_next_method_status
lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
uint64_t capacity, uint64_t *count)
/*
* Insert the next message to the message batch. This will set
- * stream iterator current messsage to NULL so that next time
+ * stream iterator current message to NULL so that next time
* we fetch the next message of that stream iterator
*/
BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
/*
* If we gathered messages, return _OK even if the graph was
* interrupted. This allows for the components downstream to at
- * least get the thoses messages. If the graph was indeed
+ * least get the those messages. If the graph was indeed
* interrupted there should not be another _next() call as the
* application will tear down the graph. This component class
* doesn't support restarting after an interruption.
lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
bt_self_message_iterator *self_msg_it)
{
- bt_self_component *self_comp = lttng_live_comp->self_comp;
- bt_logging_level log_level = lttng_live_comp->log_level;
-
- struct lttng_live_msg_iter *lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
- if (!lttng_live_msg_iter) {
- BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate lttng_live_msg_iter");
- goto end;
- }
-
+ lttng_live_msg_iter *lttng_live_msg_iter = new struct lttng_live_msg_iter;
lttng_live_msg_iter->log_level = lttng_live_comp->log_level;
lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
BT_ASSERT(lttng_live_msg_iter->sessions);
-end:
return lttng_live_msg_iter;
}
-BT_HIDDEN
bt_message_iterator_class_initialize_method_status
lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
- bt_self_message_iterator_configuration *config,
- bt_self_component_port_output *self_port)
+ bt_self_message_iterator_configuration *, bt_self_component_port_output *)
{
bt_message_iterator_class_initialize_method_status status;
struct lttng_live_component *lttng_live;
const bt_value *input_type_value;
const bt_value *input_value;
double weight = 0;
- struct bt_common_lttng_live_url_parts parts = {0};
+ struct bt_common_lttng_live_url_parts parts = {};
/* Used by the logging macros */
__attribute__((unused)) bt_self_component *self_comp = NULL;
return status;
}
-BT_HIDDEN
bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
bt_private_query_executor *priv_query_exec,
const char *object, const bt_value *params,
if (!lttng_live) {
return;
}
+
if (lttng_live->params.url) {
g_string_free(lttng_live->params.url, TRUE);
}
- g_free(lttng_live);
+
+ delete lttng_live;
}
-BT_HIDDEN
void lttng_live_component_finalize(bt_self_component_source *component)
{
lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
goto error;
}
- lttng_live = g_new0(struct lttng_live_component, 1);
- if (!lttng_live) {
- status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
- goto end;
- }
+ lttng_live = new lttng_live_component;
lttng_live->log_level = log_level;
lttng_live->self_comp = self_comp;
lttng_live->max_query_size = MAX_QUERY_SIZE;
return status;
}
-BT_HIDDEN
bt_component_class_initialize_method_status
lttng_live_component_init(bt_self_component_source *self_comp_src,
- bt_self_component_source_configuration *config, const bt_value *params,
- __attribute__((unused)) void *init_method_data)
+ bt_self_component_source_configuration *, const bt_value *params, void *)
{
struct lttng_live_component *lttng_live;
bt_component_class_initialize_method_status ret;