*
* Babeltrace CTF LTTng-live Client Component
*
+ * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
* Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
* Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
#include "logging.h"
+#include <glib.h>
+#include <inttypes.h>
+#include <unistd.h>
+
+#include <babeltrace/assert-internal.h>
#include <babeltrace/babeltrace.h>
#include <babeltrace/compiler-internal.h>
#include <babeltrace/types.h>
-#include <inttypes.h>
-#include <glib.h>
-#include <babeltrace/assert-internal.h>
-#include <unistd.h>
#include <plugins-common.h>
#include "data-stream.h"
#include "metadata.h"
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
-#define MAX_QUERY_SIZE (256*1024)
+#define MAX_QUERY_SIZE (256*1024)
+#define URL_PARAM "url"
+#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
+#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
+#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
+#define SESS_NOT_FOUND_ACTION_END_STR "end"
#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
-static const char *print_state(struct lttng_live_stream_iterator *s)
+static
+const char *print_live_iterator_status(enum lttng_live_iterator_status status)
+{
+ switch (status) {
+ case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
+ case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
+ case LTTNG_LIVE_ITERATOR_STATUS_END:
+ return "LTTNG_LIVE_ITERATOR_STATUS_END";
+ case LTTNG_LIVE_ITERATOR_STATUS_OK:
+ return "LTTNG_LIVE_ITERATOR_STATUS_OK";
+ case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
+ case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
+ case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
+ case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
+ default:
+ abort();
+ }
+}
+
+static
+const char *print_state(struct lttng_live_stream_iterator *s)
{
switch (s->state) {
case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
}
}
-static
-void print_stream_state(struct lttng_live_stream_iterator *stream)
-{
- const bt_port *port;
-
- port = bt_port_from_private(stream->port);
- print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
- bt_port_get_name(port),
- print_state(stream),
- stream->last_returned_inactivity_timestamp,
- stream->current_inactivity_timestamp);
- bt_port_put_ref(port);
-}
+#define print_stream_state(live_stream_iter) \
+ do { \
+ BT_LOGD("stream state %s last_inact_ts %" PRId64 \
+ ", curr_inact_ts %" PRId64, \
+ print_state(live_stream_iter), \
+ live_stream_iter->last_inactivity_ts, \
+ live_stream_iter->current_inactivity_ts); \
+ } while (0);
BT_HIDDEN
-bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
{
- bt_component *component;
- const bt_graph *graph;
- bt_bool ret;
+ const bt_component *component;
+ bool ret;
if (!lttng_live) {
- return BT_FALSE;
- }
-
- component = bt_component_from_private(lttng_live->private_component);
- graph = bt_component_get_graph(component);
- ret = bt_graph_is_canceled(graph);
- bt_graph_put_ref(graph);
- bt_component_put_ref(component);
- return ret;
-}
-
-BT_HIDDEN
-int lttng_live_add_port(struct lttng_live_component *lttng_live,
- struct lttng_live_stream_iterator *stream_iter)
-{
- int ret;
- struct bt_private_port *private_port;
- char name[STREAM_NAME_MAX_LEN];
- bt_component_status status;
-
- ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
- BT_ASSERT(ret > 0);
- strcpy(stream_iter->name, name);
- if (lttng_live_is_canceled(lttng_live)) {
- return 0;
- }
- status = bt_self_component_source_add_output_port(
- lttng_live->private_component, name, stream_iter,
- &private_port);
- switch (status) {
- case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
- return 0;
- case BT_COMPONENT_STATUS_OK:
- break;
- default:
- return -1;
- }
- bt_object_put_ref(private_port); /* weak */
- BT_LOGI("Added port %s", name);
-
- if (lttng_live->no_stream_port) {
- bt_object_get_ref(lttng_live->no_stream_port);
- ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
- bt_object_put_ref(lttng_live->no_stream_port);
- if (ret) {
- return -1;
- }
- lttng_live->no_stream_port = NULL;
- lttng_live->no_stream_iter->port = NULL;
+ ret = false;
+ goto end;
}
- stream_iter->port = private_port;
- return 0;
-}
-BT_HIDDEN
-int lttng_live_remove_port(struct lttng_live_component *lttng_live,
- struct bt_private_port *port)
-{
- bt_component *component;
- int64_t nr_ports;
- int ret;
-
- component = bt_component_from_private(lttng_live->private_component);
- nr_ports = bt_component_source_get_output_port_count(component);
- if (nr_ports < 0) {
- return -1;
- }
- BT_COMPONENT_PUT_REF_AND_RESET(component);
- if (nr_ports == 1) {
- bt_component_status status;
+ component = bt_component_source_as_component_const(
+ bt_self_component_source_as_component_source(
+ lttng_live->self_comp));
- BT_ASSERT(!lttng_live->no_stream_port);
+ ret = bt_component_graph_is_canceled(component);
- if (lttng_live_is_canceled(lttng_live)) {
- return 0;
- }
- status = bt_self_component_source_add_output_port(lttng_live->private_component,
- "no-stream", lttng_live->no_stream_iter,
- <tng_live->no_stream_port);
- switch (status) {
- case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
- return 0;
- case BT_COMPONENT_STATUS_OK:
- break;
- default:
- return -1;
- }
- bt_object_put_ref(lttng_live->no_stream_port); /* weak */
- lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
- }
- bt_object_get_ref(port);
- ret = bt_private_port_remove_from_component(port);
- bt_object_put_ref(port);
- if (ret) {
- return -1;
- }
- return 0;
+end:
+ return ret;
}
static
struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
uint64_t trace_id)
{
- struct lttng_live_trace *trace;
+ uint64_t trace_idx;
+ struct lttng_live_trace *ret_trace = NULL;
- bt_list_for_each_entry(trace, &session->traces, node) {
+ for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
+ struct lttng_live_trace *trace =
+ g_ptr_array_index(session->traces, trace_idx);
if (trace->id == trace_id) {
- return trace;
+ ret_trace = trace;
+ goto end;
}
}
- return NULL;
+
+end:
+ return ret_trace;
}
static
-void lttng_live_destroy_trace(bt_object *obj)
+void lttng_live_destroy_trace(struct lttng_live_trace *trace)
{
- struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
+ BT_LOGD("Destroy lttng_live_trace");
- BT_LOGI("Destroy trace");
- BT_ASSERT(bt_list_empty(&trace->streams));
- bt_list_del(&trace->node);
+ BT_ASSERT(trace->stream_iterators);
+ g_ptr_array_free(trace->stream_iterators, TRUE);
- if (trace->trace) {
- int retval;
+ BT_TRACE_PUT_REF_AND_RESET(trace->trace);
+ BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
- retval = bt_trace_set_is_static(trace->trace);
- BT_ASSERT(!retval);
- BT_TRACE_PUT_REF_AND_RESET(trace->trace);
- }
lttng_live_metadata_fini(trace);
- BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map);
g_free(trace);
}
}
trace->session = session;
trace->id = trace_id;
- BT_INIT_LIST_HEAD(&trace->streams);
+ trace->trace_class = NULL;
+ trace->trace = NULL;
+ trace->stream_iterators = g_ptr_array_new_with_free_func(
+ (GDestroyNotify) lttng_live_stream_iterator_destroy);
+ BT_ASSERT(trace->stream_iterators);
trace->new_metadata_needed = true;
- bt_list_add(&trace->node, &session->traces);
- bt_object_init(&trace->obj, lttng_live_destroy_trace);
+ g_ptr_array_add(session->traces, trace);
+
BT_LOGI("Create trace");
goto end;
error:
}
BT_HIDDEN
-struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
- uint64_t trace_id)
+struct lttng_live_trace *lttng_live_borrow_trace(
+ struct lttng_live_session *session, uint64_t trace_id)
{
struct lttng_live_trace *trace;
trace = lttng_live_find_trace(session, trace_id);
if (trace) {
- bt_object_get_ref(trace);
- return trace;
+ goto end;
}
- return lttng_live_create_trace(session, trace_id);
-}
-
-BT_HIDDEN
-void lttng_live_unref_trace(struct lttng_live_trace *trace)
-{
- bt_object_put_ref(trace);
-}
-static
-void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
-{
- struct lttng_live_stream_iterator *stream, *s;
+ /* The session is the owner of the newly created trace. */
+ trace = lttng_live_create_trace(session, trace_id);
- bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
- lttng_live_stream_iterator_destroy(stream);
- }
- lttng_live_metadata_fini(trace);
+end:
+ return trace;
}
BT_HIDDEN
-int lttng_live_add_session(struct lttng_live_component *lttng_live,
+int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
uint64_t session_id, const char *hostname,
const char *session_name)
{
int ret = 0;
- struct lttng_live_session *s;
+ struct lttng_live_session *session;
- s = g_new0(struct lttng_live_session, 1);
- if (!s) {
+ session = g_new0(struct lttng_live_session, 1);
+ if (!session) {
goto error;
}
- s->id = session_id;
- BT_INIT_LIST_HEAD(&s->traces);
- s->lttng_live = lttng_live;
- s->new_streams_needed = true;
- s->hostname = g_string_new(hostname);
- s->session_name = g_string_new(session_name);
+ session->id = session_id;
+ session->traces = g_ptr_array_new_with_free_func(
+ (GDestroyNotify) lttng_live_destroy_trace);
+ BT_ASSERT(session->traces);
+ session->lttng_live_msg_iter = lttng_live_msg_iter;
+ session->new_streams_needed = true;
+ session->hostname = g_string_new(hostname);
+ BT_ASSERT(session->hostname);
+
+ session->session_name = g_string_new(session_name);
+ BT_ASSERT(session->session_name);
BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
- s->id, hostname, session_name);
- bt_list_add(&s->node, <tng_live->sessions);
+ session->id, hostname, session_name);
+ g_ptr_array_add(lttng_live_msg_iter->sessions, session);
goto end;
error:
BT_LOGE("Error adding session");
- g_free(s);
+ g_free(session);
ret = -1;
end:
return ret;
static
void lttng_live_destroy_session(struct lttng_live_session *session)
{
- struct lttng_live_trace *trace, *t;
+ struct lttng_live_component *live_comp;
+
+ if (!session) {
+ goto end;
+ }
- BT_LOGI("Destroy session");
+ BT_LOGD("Destroy lttng live session");
if (session->id != -1ULL) {
if (lttng_live_detach_session(session)) {
- if (!lttng_live_is_canceled(session->lttng_live)) {
+ live_comp = session->lttng_live_msg_iter->lttng_live_comp;
+ if (session->lttng_live_msg_iter &&
+ !lttng_live_is_canceled(live_comp)) {
/* Old relayd cannot detach sessions. */
- BT_LOGD("Unable to detach session %" PRIu64,
+ BT_LOGD("Unable to detach lttng live session %" PRIu64,
session->id);
}
}
session->id = -1ULL;
}
- bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
- lttng_live_close_trace_streams(trace);
+
+ if (session->traces) {
+ g_ptr_array_free(session->traces, TRUE);
}
- bt_list_del(&session->node);
+
if (session->hostname) {
g_string_free(session->hostname, TRUE);
}
g_string_free(session->session_name, TRUE);
}
g_free(session);
+
+end:
+ return;
}
-BT_HIDDEN
-void lttng_live_iterator_finalize(bt_self_message_iterator *it)
+static
+void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- struct lttng_live_stream_iterator_generic *s =
- bt_self_message_iterator_get_user_data(it);
-
- switch (s->type) {
- case LIVE_STREAM_TYPE_NO_STREAM:
- {
- /* Leave no_stream_iter in place when port is removed. */
- break;
+ if (!lttng_live_msg_iter) {
+ goto end;
}
- case LIVE_STREAM_TYPE_STREAM:
- {
- struct lttng_live_stream_iterator *stream_iter =
- container_of(s, struct lttng_live_stream_iterator, p);
- lttng_live_stream_iterator_destroy(stream_iter);
- break;
- }
+ if (lttng_live_msg_iter->sessions) {
+ g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
}
+
+ BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
+ BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
+ BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
+
+ /* All stream iterators must be destroyed at this point. */
+ BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
+ lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
+
+ g_free(lttng_live_msg_iter);
+
+end:
+ return;
+}
+
+BT_HIDDEN
+void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
+{
+ struct lttng_live_msg_iter *lttng_live_msg_iter;
+
+ BT_ASSERT(self_msg_iter);
+
+ lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
+ BT_ASSERT(lttng_live_msg_iter);
+ lttng_live_msg_iter_destroy(lttng_live_msg_iter);
}
static
-bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
- struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
struct lttng_live_stream_iterator *lttng_live_stream)
{
switch (lttng_live_stream->state) {
case LTTNG_LIVE_STREAM_EOF:
break;
}
- return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
/*
* return EOF.
*/
static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
- struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *lttng_live_stream)
{
- bt_lttng_live_iterator_status ret =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status ret =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
struct packet_index index;
enum lttng_live_stream_state orig_state = lttng_live_stream->state;
if (lttng_live_stream->trace->new_metadata_needed) {
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
if (lttng_live_stream->trace->session->new_streams_needed) {
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
- if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
- && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
+ lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
goto end;
}
- ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
- if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
+ &index);
+ if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
- if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
- && lttng_live_stream->last_returned_inactivity_timestamp ==
- lttng_live_stream->current_inactivity_timestamp) {
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
+ curr_inact_ts = lttng_live_stream->current_inactivity_ts;
+
+ if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
+ last_inact_ts == curr_inact_ts) {
+ ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
print_stream_state(lttng_live_stream);
} else {
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
}
goto end;
}
lttng_live_stream->offset = index.offset;
lttng_live_stream->len = index.packet_size / CHAR_BIT;
end:
- if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
- ret = lttng_live_iterator_next_check_stream_state(
- lttng_live, lttng_live_stream);
+ if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
}
return ret;
}
/*
- * Creation of the message requires the ctf trace to be created
- * beforehand, but the live protocol gives us all streams (including
- * metadata) at once. So we split it in three steps: getting streams,
- * getting metadata (which creates the ctf trace), and then creating the
- * per-stream messages.
+ * Creation of the message requires the ctf trace class to be created
+ * beforehand, but the live protocol gives us all streams (including metadata)
+ * at once. So we split it in three steps: getting streams, getting metadata
+ * (which creates the ctf trace class), and then creating the per-stream
+ * messages.
*/
static
-bt_lttng_live_iterator_status lttng_live_get_session(
- struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_get_session(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_session *session)
{
- bt_lttng_live_iterator_status status;
- struct lttng_live_trace *trace, *t;
+ enum lttng_live_iterator_status status;
+ uint64_t trace_idx;
+ int ret = 0;
- if (lttng_live_attach_session(session)) {
- if (lttng_live_is_canceled(lttng_live)) {
- return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
- } else {
- return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (!session->attached) {
+ ret = lttng_live_attach_session(session);
+ if (ret) {
+ if (lttng_live_msg_iter && lttng_live_is_canceled(
+ lttng_live_msg_iter->lttng_live_comp)) {
+ status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
+ goto end;
}
}
+
status = lttng_live_get_new_streams(session);
- if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
- status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
- return status;
+ if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != LTTNG_LIVE_ITERATOR_STATUS_END) {
+ goto end;
}
- bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+ for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
+ struct lttng_live_trace *trace =
+ g_ptr_array_index(session->traces, trace_idx);
+
status = lttng_live_metadata_update(trace);
- if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
- status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
- return status;
+ if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != LTTNG_LIVE_ITERATOR_STATUS_END) {
+ goto end;
}
}
- return lttng_live_lazy_msg_init(session);
+ status = lttng_live_lazy_msg_init(session);
+
+end:
+ return status;
}
BT_HIDDEN
-void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
+void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- struct lttng_live_session *session;
+ uint64_t session_idx;
- bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+ session_idx++) {
+ struct lttng_live_session *session =
+ g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
session->new_streams_needed = true;
}
}
static
-void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
+void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- struct lttng_live_session *session;
-
- bt_list_for_each_entry(session, <tng_live->sessions, node) {
- struct lttng_live_trace *trace;
+ uint64_t session_idx, trace_idx;
+ for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+ session_idx++) {
+ struct lttng_live_session *session =
+ g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
session->new_streams_needed = true;
- bt_list_for_each_entry(trace, &session->traces, node) {
+ for (trace_idx = 0; trace_idx < session->traces->len;
+ trace_idx++) {
+ struct lttng_live_trace *trace =
+ g_ptr_array_index(session->traces, trace_idx);
trace->new_metadata_needed = true;
}
}
}
static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
- struct lttng_live_component *lttng_live)
+enum lttng_live_iterator_status
+lttng_live_iterator_handle_new_streams_and_metadata(
+ struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- bt_lttng_live_iterator_status ret =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
- unsigned int nr_sessions_opened = 0;
- struct lttng_live_session *session, *s;
-
- bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) {
- if (session->closed && bt_list_empty(&session->traces)) {
- lttng_live_destroy_session(session);
- }
- }
+ enum lttng_live_iterator_status ret =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
+ uint64_t session_idx = 0, nr_sessions_opened = 0;
+ struct lttng_live_session *session;
+ enum session_not_found_action sess_not_found_act =
+ lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
+
/*
- * Currently, when there are no sessions, we quit immediately.
- * We may want to add a component parameter to keep trying until
- * we get data in the future.
- * Also, in a remotely distant future, we could add a "new
+ * In a remotely distant future, we could add a "new
* session" flag to the protocol, which would tell us that we
* need to query for new sessions even though we have sessions
* currently ongoing.
*/
- if (bt_list_empty(<tng_live->sessions)) {
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
- goto end;
+ if (lttng_live_msg_iter->sessions->len == 0) {
+ if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
+ ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+ goto end;
+ } else {
+ /*
+ * Retry to create a viewer session for the requested
+ * session name.
+ */
+ if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+ ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+ }
}
- bt_list_for_each_entry(session, <tng_live->sessions, node) {
- ret = lttng_live_get_session(lttng_live, session);
+
+ for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+ session_idx++) {
+ session = g_ptr_array_index(lttng_live_msg_iter->sessions,
+ session_idx);
+ ret = lttng_live_get_session(lttng_live_msg_iter, session);
switch (ret) {
- case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ case LTTNG_LIVE_ITERATOR_STATUS_OK:
break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ case LTTNG_LIVE_ITERATOR_STATUS_END:
+ ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
default:
goto end;
}
}
end:
- if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
+ nr_sessions_opened == 0) {
+ ret = LTTNG_LIVE_ITERATOR_STATUS_END;
}
return ret;
}
static
-bt_lttng_live_iterator_status emit_inactivity_message(
- struct lttng_live_component *lttng_live,
- struct lttng_live_stream_iterator *lttng_live_stream,
- const bt_message **message,
- uint64_t timestamp)
+enum lttng_live_iterator_status emit_inactivity_message(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
+ struct lttng_live_stream_iterator *stream_iter,
+ bt_message **message, uint64_t timestamp)
{
- bt_lttng_live_iterator_status ret =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
- struct lttng_live_trace *trace;
- const bt_clock_class *clock_class = NULL;
- bt_clock_snapshot *clock_snapshot = NULL;
- const bt_message *msg = NULL;
- int retval;
+ enum lttng_live_iterator_status ret =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
+ bt_message *msg = NULL;
- trace = lttng_live_stream->trace;
- if (!trace) {
- goto error;
- }
- clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
- if (!clock_class) {
- goto error;
- }
- clock_snapshot = bt_clock_snapshot_create(clock_class, timestamp);
- if (!clock_snapshot) {
- goto error;
- }
- msg = bt_message_inactivity_create(trace->cc_prio_map);
+ BT_ASSERT(stream_iter->trace->clock_class);
+
+ msg = bt_message_message_iterator_inactivity_create(
+ lttng_live_msg_iter->self_msg_iter,
+ stream_iter->trace->clock_class,
+ timestamp);
if (!msg) {
goto error;
}
- retval = bt_message_inactivity_set_clock_snapshot(msg, clock_snapshot);
- if (retval) {
- goto error;
- }
+
*message = msg;
end:
- bt_object_put_ref(clock_snapshot);
- bt_clock_class_put_ref(clock_class);
return ret;
error:
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
bt_message_put_ref(msg);
goto end;
}
static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
- struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *lttng_live_stream,
- const bt_message **message)
+ bt_message **message)
{
- bt_lttng_live_iterator_status ret =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
- const bt_clock_class *clock_class = NULL;
- bt_clock_snapshot *clock_snapshot = NULL;
+ enum lttng_live_iterator_status ret =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
- return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
- if (lttng_live_stream->current_inactivity_timestamp ==
- lttng_live_stream->last_returned_inactivity_timestamp) {
+ if (lttng_live_stream->current_inactivity_ts ==
+ lttng_live_stream->last_inactivity_ts) {
lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
+ }
+
+ ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
+ message, lttng_live_stream->current_inactivity_ts);
+
+ lttng_live_stream->last_inactivity_ts =
+ lttng_live_stream->current_inactivity_ts;
+end:
+ return ret;
+}
+
+static
+int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
+ const bt_message *msg, int64_t last_msg_ts_ns,
+ int64_t *ts_ns)
+{
+ const bt_clock_class *clock_class = NULL;
+ const bt_clock_snapshot *clock_snapshot = NULL;
+ int ret = 0;
+ bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
+ bt_message_stream_activity_clock_snapshot_state sa_cs_state;
+
+ BT_ASSERT(msg);
+ BT_ASSERT(ts_ns);
+
+ BT_LOGV("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
+ "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
+ last_msg_ts_ns);
+
+ switch (bt_message_get_type(msg)) {
+ case BT_MESSAGE_TYPE_EVENT:
+ clock_class =
+ bt_message_event_borrow_stream_class_default_clock_class_const(
+ msg);
+ BT_ASSERT(clock_class);
+
+ cs_state = bt_message_event_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+ clock_class =
+ bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
+ msg);
+ BT_ASSERT(clock_class);
+
+ cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_PACKET_END:
+ clock_class =
+ bt_message_packet_end_borrow_stream_class_default_clock_class_const(
+ msg);
+ BT_ASSERT(clock_class);
+
+ cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ clock_class =
+ bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
+ msg);
+ BT_ASSERT(clock_class);
+
+ cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ clock_class =
+ bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
+ msg);
+ BT_ASSERT(clock_class);
+
+ cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+ clock_class =
+ bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
+ msg);
+ BT_ASSERT(clock_class);
+
+ sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+ goto no_clock_snapshot;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+ clock_class =
+ bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+ msg);
+ BT_ASSERT(clock_class);
+
+ sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+ goto no_clock_snapshot;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
+ cs_state =
+ bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ default:
+ /* All the other messages have a higher priority */
+ BT_LOGV_STR("Message has no timestamp: using the last message timestamp.");
+ *ts_ns = last_msg_ts_ns;
goto end;
}
- ret = emit_inactivity_message(lttng_live, lttng_live_stream, message,
- (uint64_t) lttng_live_stream->current_inactivity_timestamp);
+ if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
+ BT_LOGE_STR("Unsupported unknown clock snapshot.");
+ ret = -1;
+ goto end;
+ }
+
+ clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
+ BT_ASSERT(clock_class);
+
+ ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
+ if (ret) {
+ BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
+ "clock-snapshot-addr=%p", clock_snapshot);
+ goto error;
+ }
+
+ goto end;
+
+no_clock_snapshot:
+ BT_LOGV_STR("Message's default clock snapshot is missing: "
+ "using the last message timestamp.");
+ *ts_ns = last_msg_ts_ns;
+ goto end;
+
+error:
+ ret = -1;
- lttng_live_stream->last_returned_inactivity_timestamp =
- lttng_live_stream->current_inactivity_timestamp;
end:
- bt_object_put_ref(clock_snapshot);
- bt_clock_class_put_ref(clock_class);
+ if (ret == 0) {
+ BT_LOGV("Found message's timestamp: "
+ "iter-data-addr=%p, msg-addr=%p, "
+ "last-msg-ts=%" PRId64 ", ts=%" PRId64,
+ lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
+ }
+
return ret;
}
static
-bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
- struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *lttng_live_stream,
- const bt_message **message)
+ bt_message **message)
{
- bt_lttng_live_iterator_status ret =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum bt_msg_iter_status status;
- struct lttng_live_session *session;
+ uint64_t session_idx, trace_idx;
- bt_list_for_each_entry(session, <tng_live->sessions, node) {
- struct lttng_live_trace *trace;
+ for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+ session_idx++) {
+ struct lttng_live_session *session =
+ g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
if (session->new_streams_needed) {
- return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
}
- bt_list_for_each_entry(trace, &session->traces, node) {
+ for (trace_idx = 0; trace_idx < session->traces->len;
+ trace_idx++) {
+ struct lttng_live_trace *trace =
+ g_ptr_array_index(session->traces, trace_idx);
if (trace->new_metadata_needed) {
- return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
}
}
}
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
- return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
- if (lttng_live_stream->packet_end_msg_queue) {
- *message = lttng_live_stream->packet_end_msg_queue;
- lttng_live_stream->packet_end_msg_queue = NULL;
- status = BT_MSG_ITER_STATUS_OK;
- } else {
- status = bt_msg_iter_get_next_message(
- lttng_live_stream->msg_iter,
- lttng_live_stream->trace->cc_prio_map,
- message);
- if (status == BT_MSG_ITER_STATUS_OK) {
- /*
- * Consider empty packets as inactivity.
- */
- if (bt_message_get_type(*message) == BT_MESSAGE_TYPE_PACKET_END) {
- lttng_live_stream->packet_end_msg_queue = *message;
- *message = NULL;
- return emit_inactivity_message(lttng_live,
- lttng_live_stream, message,
- lttng_live_stream->current_packet_end_timestamp);
- }
- }
+ ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ goto end;
}
+
+ status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
+ lttng_live_msg_iter->self_msg_iter, message);
switch (status) {
case BT_MSG_ITER_STATUS_EOF:
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_END;
break;
case BT_MSG_ITER_STATUS_OK:
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
case BT_MSG_ITER_STATUS_AGAIN:
/*
* get_index may return AGAIN to delay the following
* attempt.
*/
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
break;
case BT_MSG_ITER_STATUS_INVAL:
/* No argument provided by the user, so don't return INVAL. */
case BT_MSG_ITER_STATUS_ERROR:
default:
- ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
+ lttng_live_stream->msg_iter);
break;
}
+
+end:
return ret;
}
* When disconnected from relayd: try to re-connect endlessly.
*/
static
-bt_message_iterator_next_method_return lttng_live_iterator_next_stream(
- bt_self_message_iterator *iterator,
- struct lttng_live_stream_iterator *stream_iter)
+enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
+ struct lttng_live_stream_iterator *stream_iter,
+ bt_message **curr_msg)
{
- bt_lttng_live_iterator_status status;
- bt_message_iterator_next_method_return next_return;
- struct lttng_live_component *lttng_live;
+ enum lttng_live_iterator_status live_status;
- lttng_live = stream_iter->trace->session->lttng_live;
retry:
print_stream_state(stream_iter);
- next_return.message = NULL;
- status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
- if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ live_status = lttng_live_iterator_handle_new_streams_and_metadata(
+ lttng_live_msg_iter);
+ if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
- status = lttng_live_iterator_next_handle_one_no_data_stream(
- lttng_live, stream_iter);
- if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ live_status = lttng_live_iterator_next_handle_one_no_data_stream(
+ lttng_live_msg_iter, stream_iter);
+ if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
- status = lttng_live_iterator_next_handle_one_quiescent_stream(
- lttng_live, stream_iter, &next_return.message);
- if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
- BT_ASSERT(next_return.message == NULL);
+ live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
+ lttng_live_msg_iter, stream_iter, curr_msg);
+ if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ BT_ASSERT(*curr_msg == NULL);
goto end;
}
- if (next_return.message) {
+ if (*curr_msg) {
goto end;
}
- status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
- stream_iter, &next_return.message);
- if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
- BT_ASSERT(next_return.message == NULL);
+ live_status = lttng_live_iterator_next_handle_one_active_data_stream(
+ lttng_live_msg_iter, stream_iter, curr_msg);
+ if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ BT_ASSERT(*curr_msg == NULL);
}
end:
- switch (status) {
- case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
- print_dbg("continue");
+ if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
goto retry;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
- print_dbg("again");
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
- print_dbg("end");
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_OK;
- print_dbg("ok");
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
- default: /* fall-through */
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
- break;
}
- return next_return;
+
+ return live_status;
}
static
-bt_message_iterator_next_method_return lttng_live_iterator_next_no_stream(
- bt_self_message_iterator *iterator,
- struct lttng_live_no_stream_iterator *no_stream_iter)
+enum lttng_live_iterator_status next_stream_iterator_for_trace(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
+ struct lttng_live_trace *live_trace,
+ struct lttng_live_stream_iterator **candidate_stream_iter)
{
- bt_lttng_live_iterator_status status;
- bt_message_iterator_next_method_return next_return;
- struct lttng_live_component *lttng_live;
+ struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
+ enum lttng_live_iterator_status stream_iter_status;;
+ int64_t curr_candidate_msg_ts = INT64_MAX;
+ uint64_t stream_iter_idx;
- lttng_live = no_stream_iter->lttng_live;
-retry:
- lttng_live_force_new_streams_and_metadata(lttng_live);
- next_return.message = NULL;
- status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
- if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ BT_ASSERT(live_trace);
+ BT_ASSERT(live_trace->stream_iterators);
+ /*
+ * Update the current message of every stream iterators of this trace.
+ * The current msg of every stream must have a timestamp equal or
+ * larger than the last message returned by this iterator. We must
+ * ensure monotonicity.
+ */
+ stream_iter_idx = 0;
+ while (stream_iter_idx < live_trace->stream_iterators->len) {
+ bool stream_iter_is_ended = false;
+ struct lttng_live_stream_iterator *stream_iter =
+ g_ptr_array_index(live_trace->stream_iterators,
+ stream_iter_idx);
+
+ /*
+ * Find if there is are now current message for this stream
+ * iterator get it.
+ */
+ while (!stream_iter->current_msg) {
+ bt_message *msg = NULL;
+ int64_t curr_msg_ts_ns = INT64_MAX;
+ stream_iter_status = lttng_live_iterator_next_on_stream(
+ lttng_live_msg_iter, stream_iter, &msg);
+
+ BT_LOGD("live stream iterator returned status :%s",
+ print_live_iterator_status(stream_iter_status));
+ if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+ stream_iter_is_ended = true;
+ break;
+ }
+
+ if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ BT_ASSERT(msg);
+
+ /*
+ * Get the timestamp in nanoseconds from origin of this
+ * messsage.
+ */
+ live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
+ msg, lttng_live_msg_iter->last_msg_ts_ns,
+ &curr_msg_ts_ns);
+
+ /*
+ * Check if the message of the current live stream
+ * iterator occured at the exact same time or after the
+ * last message returned by this component's message
+ * iterator. If not, we return an error.
+ */
+ if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
+ stream_iter->current_msg = msg;
+ stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
+ } else {
+ /*
+ * We received a message in the past. To ensure
+ * monotonicity, we can't send it forward.
+ */
+ BT_LOGE("Message's timestamp is less than "
+ "lttng-live's message iterator's last "
+ "returned timestamp: "
+ "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
+ "last-msg-ts=%" PRId64,
+ lttng_live_msg_iter, curr_msg_ts_ns,
+ lttng_live_msg_iter->last_msg_ts_ns);
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+ }
+
+ if (!stream_iter_is_ended &&
+ stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
+ /*
+ * Update the current best candidate message for the
+ * stream iterator of thise live trace to be forwarded
+ * downstream.
+ */
+ curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+ curr_candidate_stream_iter = stream_iter;
+ }
+
+ if (stream_iter_is_ended) {
+ /*
+ * The live stream iterator is ENDed. We remove that
+ * iterator from the list and we restart the iteration
+ * at the beginning of the live stream iterator array
+ * to because the removal will shuffle the array.
+ */
+ g_ptr_array_remove_index_fast(live_trace->stream_iterators,
+ stream_iter_idx);
+ stream_iter_idx = 0;
+ } else {
+ stream_iter_idx++;
+ }
+ }
+
+ if (curr_candidate_stream_iter) {
+ *candidate_stream_iter = curr_candidate_stream_iter;
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ } else {
+ /*
+ * The only case where we don't have a candidate for this trace
+ * is if we reached the end of all the iterators.
+ */
+ BT_ASSERT(live_trace->stream_iterators->len == 0);
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+
+end:
+ return stream_iter_status;
+}
+
+static
+enum lttng_live_iterator_status next_stream_iterator_for_session(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
+ struct lttng_live_session *session,
+ struct lttng_live_stream_iterator **candidate_session_stream_iter)
+{
+ enum lttng_live_iterator_status stream_iter_status;
+ uint64_t trace_idx = 0;
+ int64_t curr_candidate_msg_ts = INT64_MAX;
+ struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
+
+ /*
+ * Make sure we are attached to the session and look for new streams
+ * and metadata.
+ */
+ stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
+ if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
+ stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
goto end;
}
- if (no_stream_iter->port) {
- status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+
+ BT_ASSERT(session->traces);
+
+ /*
+ * Use while loops here rather then for loops so we can restart the
+ * iteration if an element is removed from the array during the
+ * looping.
+ */
+ while (trace_idx < session->traces->len) {
+ bool trace_is_ended = false;
+ struct lttng_live_stream_iterator *stream_iter;
+ struct lttng_live_trace *trace =
+ g_ptr_array_index(session->traces, trace_idx);
+
+ stream_iter_status = next_stream_iterator_for_trace(
+ lttng_live_msg_iter, trace, &stream_iter);
+ if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+ /*
+ * All the live stream iterators for this trace are
+ * ENDed. Remove the trace from this session.
+ */
+ trace_is_ended = true;
+ } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ if (!trace_is_ended) {
+ BT_ASSERT(stream_iter);
+
+ if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
+ curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
+ curr_candidate_stream_iter = stream_iter;
+ }
+ trace_idx++;
+ } else {
+ g_ptr_array_remove_index_fast(session->traces, trace_idx);
+ trace_idx = 0;
+ }
+ }
+ if (curr_candidate_stream_iter) {
+ *candidate_session_stream_iter = curr_candidate_stream_iter;
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
} else {
- status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ /*
+ * The only cases where we don't have a candidate for this
+ * trace is:
+ * 1. if we reached the end of all the iterators of all the
+ * traces of this session,
+ * 2. if we never had live stream iterator in the first place.
+ *
+ * In either cases, we return END.
+ */
+ BT_ASSERT(session->traces->len == 0);
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
}
end:
- switch (status) {
- case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
- goto retry;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
- break;
- case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
- default: /* fall-through */
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
- break;
+ return stream_iter_status;
+}
+
+static inline
+void put_messages(bt_message_array_const msgs, uint64_t count)
+{
+ uint64_t i;
+
+ for (i = 0; i < count; i++) {
+ BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
}
- return next_return;
}
BT_HIDDEN
-bt_message_iterator_next_method_return lttng_live_iterator_next(
- bt_self_message_iterator *iterator)
+bt_self_message_iterator_status lttng_live_msg_iter_next(
+ bt_self_message_iterator *self_msg_it,
+ bt_message_array_const msgs, uint64_t capacity,
+ uint64_t *count)
{
- struct lttng_live_stream_iterator_generic *s =
- bt_self_message_iterator_get_user_data(iterator);
- bt_message_iterator_next_method_return next_return;
-
- switch (s->type) {
- case LIVE_STREAM_TYPE_NO_STREAM:
- next_return = lttng_live_iterator_next_no_stream(iterator,
- container_of(s, struct lttng_live_no_stream_iterator, p));
+ bt_self_message_iterator_status status;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ bt_self_message_iterator_get_data(self_msg_it);
+ struct lttng_live_component *lttng_live =
+ lttng_live_msg_iter->lttng_live_comp;
+ enum lttng_live_iterator_status stream_iter_status;
+ uint64_t session_idx;
+
+ *count = 0;
+
+ BT_ASSERT(lttng_live_msg_iter);
+
+ /*
+ * Clear all the invalid message reference that might be left over in
+ * the output array.
+ */
+ memset(msgs, 0, capacity * sizeof(*msgs));
+
+ /*
+ * If no session are exposed on the relay found at the url provided by
+ * the user, session count will be 0. In this case, we return status
+ * end to return gracefully.
+ */
+ if (lttng_live_msg_iter->sessions->len == 0) {
+ if (lttng_live->params.sess_not_found_act !=
+ SESSION_NOT_FOUND_ACTION_CONTINUE) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
+ goto no_session;
+ } else {
+ /*
+ * The are no more active session for this session
+ * name. Retry to create a viewer session for the
+ * requested session name.
+ */
+ if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto no_session;
+ }
+ }
+ }
+
+ if (lttng_live_msg_iter->active_stream_iter == 0) {
+ lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
+ }
+
+ /*
+ * Here the muxing of message is done.
+ *
+ * We need to iterate over all the streams of all the traces of all the
+ * viewer sessions in order to get the message with the smallest
+ * timestamp. In this case, a session is a viewer session and there is
+ * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
+ * kernel). Each viewer session can have multiple traces, for example,
+ * 64bit UST viewer sessions could have multiple per-pid traces.
+ *
+ * We iterate over the streams of each traces to update and see what is
+ * their next message's timestamp. From those timestamps, we select the
+ * message with the smallest timestamp as the best candidate message
+ * for that trace and do the same thing across all the sessions.
+ *
+ * We then compare the timestamp of best candidate message of all the
+ * sessions to pick the message with the smallest timestamp and we
+ * return it.
+ */
+ while (*count < capacity) {
+ struct lttng_live_stream_iterator *next_stream_iter = NULL,
+ *candidate_stream_iter = NULL;
+ int64_t next_msg_ts_ns = INT64_MAX;
+
+ BT_ASSERT(lttng_live_msg_iter->sessions);
+ session_idx = 0;
+ /*
+ * Use a while loop instead of a for loop so we can restart the
+ * iteration if we remove an element. We can safely call
+ * next_stream_iterator_for_session() multiple times on the
+ * same session as we only fetch a new message if there is no
+ * current next message for each live stream iterator.
+ * If all live stream iterator of that session already have a
+ * current next message, the function will simply exit return
+ * the same candidate live stream iterator every time.
+ */
+ while (session_idx < lttng_live_msg_iter->sessions->len) {
+ struct lttng_live_session *session =
+ g_ptr_array_index(lttng_live_msg_iter->sessions,
+ session_idx);
+
+ /* Find the best candidate message to send downstream. */
+ stream_iter_status = next_stream_iterator_for_session(
+ lttng_live_msg_iter, session,
+ &candidate_stream_iter);
+
+ /* If we receive an END status, it means that either:
+ * - Those traces never had active streams (UST with no
+ * data produced yet),
+ * - All live stream iterators have ENDed.*/
+ if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
+ if (session->closed && session->traces->len == 0) {
+ /*
+ * Remove the session from the list and restart the
+ * iteration at the beginning of the array since the
+ * removal shuffle the elements of the array.
+ */
+ g_ptr_array_remove_index_fast(
+ lttng_live_msg_iter->sessions,
+ session_idx);
+ session_idx = 0;
+ } else {
+ session_idx++;
+ }
+ continue;
+ }
+
+ if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
+ next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
+ next_stream_iter = candidate_stream_iter;
+ }
+
+ session_idx++;
+ }
+
+ if (!next_stream_iter) {
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ goto end;
+ }
+
+ BT_ASSERT(next_stream_iter->current_msg);
+ /* Ensure monotonicity. */
+ BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
+ next_stream_iter->current_msg_ts_ns);
+
+ /*
+ * Insert the next message to the message batch. This will set
+ * stream iterator current messsage to NULL so that next time
+ * we fetch the next message of that stream iterator
+ */
+ BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
+ (*count)++;
+
+ /* Update the last timestamp in nanoseconds sent downstream. */
+ lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
+ next_stream_iter->current_msg_ts_ns = INT64_MAX;
+
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ }
+end:
+ switch (stream_iter_status) {
+ case LTTNG_LIVE_ITERATOR_STATUS_OK:
+ case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ if (*count > 0) {
+ /*
+ * We received a again status but we have some messages
+ * to send downstream. We send them and return OK for
+ * now. On the next call we return again if there are
+ * still no new message to send.
+ */
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ } else {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
+ }
break;
- case LIVE_STREAM_TYPE_STREAM:
- next_return = lttng_live_iterator_next_stream(iterator,
- container_of(s, struct lttng_live_stream_iterator, p));
+ case LTTNG_LIVE_ITERATOR_STATUS_END:
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
break;
- default:
- next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ break;
+ case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ /* Put all existing messages on error. */
+ put_messages(msgs, *count);
break;
+ default:
+ abort();
}
- return next_return;
+
+no_session:
+ return status;
}
BT_HIDDEN
-bt_message_iterator_status lttng_live_iterator_init(
- bt_self_message_iterator *it,
- struct bt_private_port *port)
+bt_self_message_iterator_status lttng_live_msg_iter_init(
+ bt_self_message_iterator *self_msg_it,
+ bt_self_component_source *self_comp_src,
+ bt_self_component_port_output *self_port)
{
- bt_message_iterator_status ret =
- BT_MESSAGE_ITERATOR_STATUS_OK;
- struct lttng_live_stream_iterator_generic *s;
-
- BT_ASSERT(it);
-
- s = bt_private_port_get_user_data(port);
- BT_ASSERT(s);
- switch (s->type) {
- case LIVE_STREAM_TYPE_NO_STREAM:
- {
- struct lttng_live_no_stream_iterator *no_stream_iter =
- container_of(s, struct lttng_live_no_stream_iterator, p);
- ret = bt_self_message_iterator_set_user_data(it, no_stream_iter);
- if (ret) {
- goto error;
- }
- break;
+ bt_self_message_iterator_status ret =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_component *self_comp =
+ bt_self_component_source_as_self_component(self_comp_src);
+ struct lttng_live_component *lttng_live;
+ struct lttng_live_msg_iter *lttng_live_msg_iter;
+
+ BT_ASSERT(self_msg_it);
+
+ lttng_live = bt_self_component_get_data(self_comp);
+
+ /* There can be only one downstream iterator at the same time. */
+ BT_ASSERT(!lttng_live->has_msg_iter);
+ lttng_live->has_msg_iter = true;
+
+ lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
+ if (!lttng_live_msg_iter) {
+ ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
}
- case LIVE_STREAM_TYPE_STREAM:
- {
- struct lttng_live_stream_iterator *stream_iter =
- container_of(s, struct lttng_live_stream_iterator, p);
- ret = bt_self_message_iterator_set_user_data(it, stream_iter);
- if (ret) {
+
+ lttng_live_msg_iter->lttng_live_comp = lttng_live;
+ lttng_live_msg_iter->self_msg_iter = self_msg_it;
+
+ lttng_live_msg_iter->active_stream_iter = 0;
+ lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
+ lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
+ (GDestroyNotify) lttng_live_destroy_session);
+ BT_ASSERT(lttng_live_msg_iter->sessions);
+
+ lttng_live_msg_iter->viewer_connection =
+ live_viewer_connection_create(lttng_live->params.url->str, false,
+ lttng_live_msg_iter);
+ if (!lttng_live_msg_iter->viewer_connection) {
+ goto error;
+ }
+
+ if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+ goto error;
+ }
+ if (lttng_live_msg_iter->sessions->len == 0) {
+ switch (lttng_live->params.sess_not_found_act) {
+ case SESSION_NOT_FOUND_ACTION_CONTINUE:
+ BT_LOGI("Unable to connect to the requested live viewer "
+ "session. Keep trying to connect because of "
+ "%s=\"%s\" component parameter: url=\"%s\"",
+ SESS_NOT_FOUND_ACTION_PARAM,
+ SESS_NOT_FOUND_ACTION_CONTINUE_STR,
+ lttng_live->params.url->str);
+ break;
+ case SESSION_NOT_FOUND_ACTION_FAIL:
+ BT_LOGE("Unable to connect to the requested live viewer "
+ "session. Fail the message iterator"
+ "initialization because of %s=\"%s\" "
+ "component parameter: url =\"%s\"",
+ SESS_NOT_FOUND_ACTION_PARAM,
+ SESS_NOT_FOUND_ACTION_FAIL_STR,
+ lttng_live->params.url->str);
goto error;
+ case SESSION_NOT_FOUND_ACTION_END:
+ BT_LOGI("Unable to connect to the requested live viewer "
+ "session. End gracefully at the first _next() "
+ "call because of %s=\"%s\" component parameter: "
+ "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
+ SESS_NOT_FOUND_ACTION_END_STR,
+ lttng_live->params.url->str);
+ break;
}
- break;
- }
- default:
- ret = BT_MESSAGE_ITERATOR_STATUS_ERROR;
- goto end;
}
+ bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
+
+ goto end;
+error:
+ ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ lttng_live_msg_iter_destroy(lttng_live_msg_iter);
end:
return ret;
-error:
- if (bt_self_message_iterator_set_user_data(it, NULL)
- != BT_MESSAGE_ITERATOR_STATUS_OK) {
- BT_LOGE("Error setting private data to NULL");
- }
- goto end;
}
static
-bt_component_class_query_method_return lttng_live_query_list_sessions(
- const bt_component_class *comp_class,
- const bt_query_executor *query_exec,
- bt_value *params)
+bt_query_status lttng_live_query_list_sessions(const bt_value *params,
+ const bt_value **result)
{
- bt_component_class_query_method_return query_ret = {
- .result = NULL,
- .status = BT_QUERY_STATUS_OK,
- };
-
- bt_value *url_value = NULL;
+ bt_query_status status = BT_QUERY_STATUS_OK;
+ const bt_value *url_value = NULL;
const char *url;
- struct bt_live_viewer_connection *viewer_connection = NULL;
+ struct live_viewer_connection *viewer_connection = NULL;
- url_value = bt_value_map_get(params, "url");
- if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
- BT_LOGW("Mandatory \"url\" parameter missing");
- query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
+ url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
+ if (!url_value) {
+ BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
+ status = BT_QUERY_STATUS_INVALID_PARAMS;
goto error;
}
- if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
- BT_LOGW("\"url\" parameter is required to be a string value");
- query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
+ if (!bt_value_is_string(url_value)) {
+ BT_LOGW("`%s` parameter is required to be a string value",
+ URL_PARAM);
+ status = BT_QUERY_STATUS_INVALID_PARAMS;
goto error;
}
- viewer_connection = bt_live_viewer_connection_create(url, NULL);
+ url = bt_value_string_get(url_value);
+
+ viewer_connection = live_viewer_connection_create(url, true, NULL);
if (!viewer_connection) {
goto error;
}
- query_ret.result =
- bt_live_viewer_connection_list_sessions(viewer_connection);
- if (!query_ret.result) {
+ status = live_viewer_connection_list_sessions(viewer_connection,
+ result);
+ if (status != BT_QUERY_STATUS_OK) {
goto error;
}
goto end;
error:
- BT_OBJECT_PUT_REF_AND_RESET(query_ret.result);
+ BT_VALUE_PUT_REF_AND_RESET(*result);
- if (query_ret.status >= 0) {
- query_ret.status = BT_QUERY_STATUS_ERROR;
+ if (status >= 0) {
+ status = BT_QUERY_STATUS_ERROR;
}
end:
if (viewer_connection) {
- bt_live_viewer_connection_destroy(viewer_connection);
+ live_viewer_connection_destroy(viewer_connection);
}
- BT_VALUE_PUT_REF_AND_RESET(url_value);
- return query_ret;
+ return status;
}
BT_HIDDEN
-bt_component_class_query_method_return lttng_live_query(
- const bt_component_class *comp_class,
+bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
const bt_query_executor *query_exec,
- const char *object, bt_value *params)
+ const char *object, const bt_value *params,
+ const bt_value **result)
{
- bt_component_class_query_method_return ret = {
- .result = NULL,
- .status = BT_QUERY_STATUS_OK,
- };
+ bt_query_status status = BT_QUERY_STATUS_OK;
if (strcmp(object, "sessions") == 0) {
- return lttng_live_query_list_sessions(comp_class,
- query_exec, params);
+ status = lttng_live_query_list_sessions(params, result);
+ } else {
+ BT_LOGW("Unknown query object `%s`", object);
+ status = BT_QUERY_STATUS_INVALID_OBJECT;
+ goto end;
}
- BT_LOGW("Unknown query object `%s`", object);
- ret.status = BT_QUERY_STATUS_INVALID_OBJECT;
- return ret;
+
+end:
+ return status;
}
static
void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
{
- int ret;
- struct lttng_live_session *session, *s;
-
- bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) {
- lttng_live_destroy_session(session);
- }
- BT_OBJECT_PUT_REF_AND_RESET(lttng_live->viewer_connection);
- if (lttng_live->url) {
- g_string_free(lttng_live->url, TRUE);
- }
- if (lttng_live->no_stream_port) {
- bt_object_get_ref(lttng_live->no_stream_port);
- ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
- bt_object_put_ref(lttng_live->no_stream_port);
- BT_ASSERT(!ret);
- }
- if (lttng_live->no_stream_iter) {
- g_free(lttng_live->no_stream_iter);
+ if (lttng_live->params.url) {
+ g_string_free(lttng_live->params.url, TRUE);
}
g_free(lttng_live);
}
BT_HIDDEN
-void lttng_live_component_finalize(bt_self_component *component)
+void lttng_live_component_finalize(bt_self_component_source *component)
{
- void *data = bt_self_component_get_user_data(component);
+ void *data = bt_self_component_get_data(
+ bt_self_component_source_as_self_component(component));
if (!data) {
return;
}
static
-struct lttng_live_component *lttng_live_component_create(bt_value *params,
- bt_self_component *private_component)
+enum session_not_found_action parse_session_not_found_action_param(
+ const bt_value *no_session_param)
+{
+ enum session_not_found_action action;
+ const char *no_session_act_str;
+ no_session_act_str = bt_value_string_get(no_session_param);
+ if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
+ action = SESSION_NOT_FOUND_ACTION_CONTINUE;
+ } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
+ action = SESSION_NOT_FOUND_ACTION_FAIL;
+ } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
+ action = SESSION_NOT_FOUND_ACTION_END;
+ } else {
+ action = -1;
+ }
+
+ return action;
+}
+
+struct lttng_live_component *lttng_live_component_create(const bt_value *params)
{
struct lttng_live_component *lttng_live;
- bt_value *value = NULL;
+ const bt_value *value = NULL;
const char *url;
- bt_value_status ret;
lttng_live = g_new0(struct lttng_live_component, 1);
if (!lttng_live) {
goto end;
}
- /* TODO: make this an overridable parameter. */
lttng_live->max_query_size = MAX_QUERY_SIZE;
- BT_INIT_LIST_HEAD(<tng_live->sessions);
- value = bt_value_map_get(params, "url");
- if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
- BT_LOGW("Mandatory \"url\" parameter missing");
+ lttng_live->has_msg_iter = false;
+
+ value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
+ if (!value || !bt_value_is_string(value)) {
+ BT_LOGW("Mandatory `%s` parameter missing or not a string",
+ URL_PARAM);
goto error;
}
url = bt_value_string_get(value);
- lttng_live->url = g_string_new(url);
- if (!lttng_live->url) {
- goto error;
- }
- BT_VALUE_PUT_REF_AND_RESET(value);
- lttng_live->viewer_connection =
- bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
- if (!lttng_live->viewer_connection) {
+ lttng_live->params.url = g_string_new(url);
+ if (!lttng_live->params.url) {
goto error;
}
- if (lttng_live_create_viewer_session(lttng_live)) {
- goto error;
+
+ value = bt_value_map_borrow_entry_value_const(params,
+ SESS_NOT_FOUND_ACTION_PARAM);
+
+ if (value && bt_value_is_string(value)) {
+ lttng_live->params.sess_not_found_act =
+ parse_session_not_found_action_param(value);
+ if (lttng_live->params.sess_not_found_act == -1) {
+ BT_LOGE("Unexpected value for `%s` parameter: "
+ "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
+ bt_value_string_get(value));
+ goto error;
+ }
+ } else {
+ BT_LOGW("Optional `%s` parameter is missing or "
+ "not a string value. Defaulting to %s=\"%s\".",
+ SESS_NOT_FOUND_ACTION_PARAM,
+ SESS_NOT_FOUND_ACTION_PARAM,
+ SESS_NOT_FOUND_ACTION_CONTINUE_STR);
+ lttng_live->params.sess_not_found_act =
+ SESSION_NOT_FOUND_ACTION_CONTINUE;
}
- lttng_live->private_component = private_component;
goto end;
}
BT_HIDDEN
-bt_component_status lttng_live_component_init(
- bt_self_component *private_component,
- bt_value *params, void *init_method_data)
+bt_self_component_status lttng_live_component_init(
+ bt_self_component_source *self_comp,
+ const bt_value *params, UNUSED_VAR void *init_method_data)
{
struct lttng_live_component *lttng_live;
- bt_component_status ret = BT_COMPONENT_STATUS_OK;
+ bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
- /* Passes ownership of iter ref to lttng_live_component_create. */
- lttng_live = lttng_live_component_create(params, private_component);
+ lttng_live = lttng_live_component_create(params);
if (!lttng_live) {
- //TODO : we need access to the application cancel state
- //because we are not part of a graph yet.
- ret = BT_COMPONENT_STATUS_NOMEM;
+ ret = BT_SELF_COMPONENT_STATUS_NOMEM;
goto end;
}
+ lttng_live->self_comp = self_comp;
- lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
- lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
- lttng_live->no_stream_iter->lttng_live = lttng_live;
if (lttng_live_is_canceled(lttng_live)) {
goto end;
}
+
ret = bt_self_component_source_add_output_port(
- lttng_live->private_component, "no-stream",
- lttng_live->no_stream_iter,
- <tng_live->no_stream_port);
- if (ret != BT_COMPONENT_STATUS_OK) {
+ lttng_live->self_comp, "out",
+ NULL, NULL);
+ if (ret != BT_SELF_COMPONENT_STATUS_OK) {
goto end;
}
- bt_object_put_ref(lttng_live->no_stream_port); /* weak */
- lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
- ret = bt_self_component_set_user_data(private_component, lttng_live);
- if (ret != BT_COMPONENT_STATUS_OK) {
- goto error;
- }
+ bt_self_component_set_data(
+ bt_self_component_source_as_self_component(self_comp),
+ lttng_live);
end:
return ret;
-error:
- (void) bt_self_component_set_user_data(private_component, NULL);
- lttng_live_component_destroy_data(lttng_live);
- return ret;
-}
-
-BT_HIDDEN
-bt_component_status lttng_live_accept_port_connection(
- bt_self_component *private_component,
- struct bt_private_port *self_private_port,
- const bt_port *other_port)
-{
- struct lttng_live_component *lttng_live =
- bt_self_component_get_user_data(private_component);
- bt_component *other_component;
- bt_component_status status = BT_COMPONENT_STATUS_OK;
- const bt_port *self_port = bt_port_from_private(self_private_port);
-
- other_component = bt_port_get_component(other_port);
- bt_component_put_ref(other_component); /* weak */
-
- if (!lttng_live->downstream_component) {
- lttng_live->downstream_component = other_component;
- goto end;
- }
-
- /*
- * Compare prior component to ensure we are connected to the
- * same downstream component as prior ports.
- */
- if (lttng_live->downstream_component != other_component) {
- BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
- bt_port_get_name(self_port),
- bt_component_get_name(other_component),
- bt_component_get_name(lttng_live->downstream_component));
- status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
- goto end;
- }
-end:
- bt_port_put_ref(self_port);
- return status;
}
/*
+ * Copyright 2019 - Francis Deslauriers <francis.deslauriers@efficios.com>
* Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
#include <babeltrace/common-internal.h>
#include <babeltrace/babeltrace.h>
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
#include "viewer-connection.h"
#include "lttng-viewer-abi.h"
#include "data-stream.h"
#include "metadata.h"
-static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
+static
+ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection,
void *buf, size_t len)
{
ssize_t ret;
size_t copied = 0, to_copy = len;
- struct lttng_live_component *lttng_live =
- viewer_connection->lttng_live;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ viewer_connection->lttng_live_msg_iter;
BT_SOCKET sock = viewer_connection->control_sock;
do {
to_copy -= ret;
}
if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
- if (lttng_live_is_canceled(lttng_live)) {
+ if (!viewer_connection->in_query &&
+ lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
break;
} else {
continue;
return ret;
}
-static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
+static
+ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection,
const void *buf, size_t len)
{
- struct lttng_live_component *lttng_live =
- viewer_connection->lttng_live;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ viewer_connection->lttng_live_msg_iter;
BT_SOCKET sock = viewer_connection->control_sock;
ssize_t ret;
for (;;) {
ret = bt_socket_send_nosigpipe(sock, buf, len);
if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
- if (lttng_live_is_canceled(lttng_live)) {
+ if (!viewer_connection->in_query &&
+ lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
break;
} else {
continue;
return ret;
}
-static int parse_url(struct bt_live_viewer_connection *viewer_connection)
+static
+int parse_url(struct live_viewer_connection *viewer_connection)
{
char error_buf[256] = { 0 };
struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
return ret;
}
-static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection)
+static
+int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_connect connect;
return -1;
}
-static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection)
+static
+int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
{
struct hostent *host;
struct sockaddr_in server_addr;
return -1;
}
-static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
+static
+void lttng_live_disconnect_viewer(
+ struct live_viewer_connection *viewer_connection)
{
if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
return;
}
}
-static void connection_release(bt_object *obj)
+static
+void connection_release(bt_object *obj)
{
- struct bt_live_viewer_connection *conn =
- container_of(obj, struct bt_live_viewer_connection, obj);
+ struct live_viewer_connection *conn =
+ container_of(obj, struct live_viewer_connection, obj);
- bt_live_viewer_connection_destroy(conn);
+ live_viewer_connection_destroy(conn);
}
static
-bt_value_status list_update_session(bt_value *results,
+int list_update_session(bt_value *results,
const struct lttng_viewer_session *session,
bool *_found)
{
- bt_value_status ret = BT_VALUE_STATUS_OK;
+ int ret = 0;
bt_value *map = NULL;
bt_value *hostname = NULL;
bt_value *session_name = NULL;
len = bt_value_array_get_size(results);
if (len < 0) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error getting size of array.");
+ ret = -1;
goto end;
}
for (i = 0; i < len; i++) {
const char *hostname_str = NULL;
const char *session_name_str = NULL;
- map = bt_value_array_get(results, (size_t) i);
+ map = bt_value_array_borrow_element_by_index(results, (size_t) i);
if (!map) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error borrowing map.");
+ ret = -1;
goto end;
}
- hostname = bt_value_map_get(map, "target-hostname");
+ hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
if (!hostname) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error borrowing \"target-hostname\" entry.");
+ ret = -1;
goto end;
}
- session_name = bt_value_map_get(map, "session-name");
+ session_name = bt_value_map_borrow_entry_value(map, "session-name");
if (!session_name) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error borrowing \"session-name\" entry.");
+ ret = -1;
goto end;
}
hostname_str = bt_value_string_get(hostname);
found = true;
- btval = bt_value_map_get(map, "stream-count");
+ btval = bt_value_map_borrow_entry_value(map, "stream-count");
if (!btval) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error borrowing \"stream-count\" entry.");
+ ret = -1;
goto end;
}
val = bt_value_integer_get(btval);
/* sum */
val += streams;
- ret = bt_private_integer_bool_set(btval, val);
- if (ret != BT_VALUE_STATUS_OK) {
- goto end;
- }
- BT_VALUE_PUT_REF_AND_RESET(btval);
+ bt_value_integer_set(btval, val);
- btval = bt_value_map_get(map, "client-count");
+ btval = bt_value_map_borrow_entry_value(map, "client-count");
if (!btval) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error borrowing \"client-count\" entry.");
+ ret = -1;
goto end;
}
val = bt_value_integer_get(btval);
/* max */
val = max_t(int64_t, clients, val);
- ret = bt_private_integer_bool_set(btval, val);
- if (ret != BT_VALUE_STATUS_OK) {
- goto end;
- }
- BT_VALUE_PUT_REF_AND_RESET(btval);
+ bt_value_integer_set(btval, val);
}
- BT_VALUE_PUT_REF_AND_RESET(hostname);
- BT_VALUE_PUT_REF_AND_RESET(session_name);
- BT_VALUE_PUT_REF_AND_RESET(map);
-
if (found) {
break;
}
}
end:
- BT_VALUE_PUT_REF_AND_RESET(btval);
- BT_VALUE_PUT_REF_AND_RESET(hostname);
- BT_VALUE_PUT_REF_AND_RESET(session_name);
- BT_VALUE_PUT_REF_AND_RESET(map);
*_found = found;
return ret;
}
static
-bt_value_status list_append_session(bt_value *results,
+int list_append_session(bt_value *results,
GString *base_url,
const struct lttng_viewer_session *session)
{
- bt_value_status ret = BT_VALUE_STATUS_OK;
+ int ret = 0;
+ bt_value_status ret_status;
bt_value *map = NULL;
GString *url = NULL;
bool found = false;
* and do max of client counts.
*/
ret = list_update_session(results, session, &found);
- if (ret != BT_VALUE_STATUS_OK || found) {
+ if (ret || found) {
goto end;
}
- map = bt_private_value_map_create();
+ map = bt_value_map_create();
if (!map) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error creating map value.");
+ ret = -1;
goto end;
}
if (base_url->len < 1) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error: base_url length smaller than 1.");
+ ret = -1;
goto end;
}
/*
g_string_append_c(url, '/');
g_string_append(url, session->session_name);
- ret = bt_private_value_map_insert_string_entry(map, "url", url->str);
- if (ret != BT_VALUE_STATUS_OK) {
+ ret_status = bt_value_map_insert_string_entry(map, "url", url->str);
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"url\" entry.");
+ ret = -1;
goto end;
}
* key = "target-hostname",
* value = <string>,
*/
- ret = bt_private_value_map_insert_string_entry(map, "target-hostname",
+ ret_status = bt_value_map_insert_string_entry(map, "target-hostname",
session->hostname);
- if (ret != BT_VALUE_STATUS_OK) {
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"target-hostname\" entry.");
+ ret = -1;
goto end;
}
* key = "session-name",
* value = <string>,
*/
- ret = bt_private_value_map_insert_string_entry(map, "session-name",
+ ret_status = bt_value_map_insert_string_entry(map, "session-name",
session->session_name);
- if (ret != BT_VALUE_STATUS_OK) {
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"session-name\" entry.");
+ ret = -1;
goto end;
}
{
uint32_t live_timer = be32toh(session->live_timer);
- ret = bt_private_value_map_insert_integer_entry(map, "timer-us",
+ ret_status = bt_value_map_insert_integer_entry(map, "timer-us",
live_timer);
- if (ret != BT_VALUE_STATUS_OK) {
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"timer-us\" entry.");
+ ret = -1;
goto end;
}
}
{
uint32_t streams = be32toh(session->streams);
- ret = bt_private_value_map_insert_integer_entry(map, "stream-count",
+ ret_status = bt_value_map_insert_integer_entry(map, "stream-count",
streams);
- if (ret != BT_VALUE_STATUS_OK) {
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"stream-count\" entry.");
+ ret = -1;
goto end;
}
}
-
/*
* key = "client-count",
* value = <integer>,
{
uint32_t clients = be32toh(session->clients);
- ret = bt_private_value_map_insert_integer_entry(map, "client-count",
+ ret_status = bt_value_map_insert_integer_entry(map, "client-count",
clients);
- if (ret != BT_VALUE_STATUS_OK) {
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"client-count\" entry.");
+ ret = -1;
goto end;
}
}
- ret = bt_private_value_array_append_element(results, map);
+ ret_status = bt_value_array_append_element(results, map);
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error appending map to results.");
+ ret = -1;
+ }
+
end:
if (url) {
- g_string_free(url, TRUE);
+ g_string_free(url, true);
}
BT_VALUE_PUT_REF_AND_RESET(map);
return ret;
*/
BT_HIDDEN
-bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
+bt_query_status live_viewer_connection_list_sessions(
+ struct live_viewer_connection *viewer_connection,
+ const bt_value **user_result)
{
- bt_value *results = NULL;
+ bt_query_status status = BT_QUERY_STATUS_OK;
+ bt_value *result = NULL;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_list_sessions list;
uint32_t i, sessions_count;
goto error;
}
- results = bt_private_value_array_create();
- if (!results) {
+ result = bt_value_array_create();
+ if (!result) {
BT_LOGE("Error creating array");
+ status = BT_QUERY_STATUS_NOMEM;
goto error;
}
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
BT_ASSERT(ret_len == sizeof(cmd));
ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
BT_ASSERT(ret_len == sizeof(list));
for (i = 0; i < sessions_count; i++) {
struct lttng_viewer_session lsession;
- ret_len = lttng_live_recv(viewer_connection,
- &lsession, sizeof(lsession));
+ ret_len = lttng_live_recv(viewer_connection, &lsession,
+ sizeof(lsession));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
BT_ASSERT(ret_len == sizeof(lsession));
lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
- if (list_append_session(results,
- viewer_connection->url, &lsession)
- != BT_VALUE_STATUS_OK) {
+ if (list_append_session(result, viewer_connection->url,
+ &lsession)) {
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
}
+
+ *user_result = result;
goto end;
error:
- BT_VALUE_PUT_REF_AND_RESET(results);
+ BT_VALUE_PUT_REF_AND_RESET(result);
end:
- return results;
+ return status;
}
static
-int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
+int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_list_sessions list;
uint32_t i, sessions_count;
ssize_t ret_len;
uint64_t session_id;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
cmd.data_size = htobe64((uint64_t) 0);
LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
viewer_connection->target_hostname->str,
LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
- if (lttng_live_add_session(lttng_live, session_id,
+ if (lttng_live_add_session(lttng_live_msg_iter, session_id,
lsession.hostname,
lsession.session_name)) {
goto error;
}
BT_HIDDEN
-int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
+int lttng_live_create_viewer_session(
+ struct lttng_live_msg_iter *lttng_live_msg_iter)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_create_session_response resp;
ssize_t ret_len;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
cmd.data_size = htobe64((uint64_t) 0);
BT_LOGE("Error creating viewer session");
goto error;
}
- if (lttng_live_query_session_ids(lttng_live)) {
+ if (lttng_live_query_session_ids(lttng_live_msg_iter)) {
goto error;
}
{
ssize_t ret_len;
uint32_t i;
- struct lttng_live_component *lttng_live = session->lttng_live;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
BT_LOGD("Getting %" PRIu32 " new streams:", stream_count);
for (i = 0; i < stream_count; i++) {
struct lttng_viewer_attach_session_request rq;
struct lttng_viewer_attach_session_response rp;
ssize_t ret_len;
- struct lttng_live_component *lttng_live = session->lttng_live;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
uint64_t session_id = session->id;
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- if (session->attached) {
- return 0;
- }
-
cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
struct lttng_viewer_detach_session_request rq;
struct lttng_viewer_detach_session_response rp;
ssize_t ret_len;
- struct lttng_live_component *lttng_live = session->lttng_live;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
uint64_t session_id = session->id;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
char *data = NULL;
ssize_t ret_len;
struct lttng_live_session *session = trace->session;
- struct lttng_live_component *lttng_live = session->lttng_live;
+ struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
struct lttng_live_metadata *metadata = trace->metadata;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
}
BT_HIDDEN
-bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_get_next_index(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream,
struct packet_index *index)
{
ssize_t ret_len;
struct lttng_viewer_index rp;
uint32_t flags, status;
- bt_lttng_live_iterator_status retstatus =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ enum lttng_live_iterator_status retstatus =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ struct lttng_live_component *lttng_live =
+ lttng_live_msg_iter->lttng_live_comp;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
+ BT_LOGE("Error sending get_next_index request: %s",
+ bt_socket_errormsg());
goto error;
}
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg());
+ BT_LOGE("Error receiving get_next_index response: %s",
+ bt_socket_errormsg());
goto error;
}
BT_ASSERT(ret_len == sizeof(rp));
BT_LOGD("get_next_index: inactive");
memset(index, 0, sizeof(struct packet_index));
index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
- stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
+ stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
ctf_stream_class_id = be64toh(rp.stream_id);
if (stream->ctf_stream_class_id != -1ULL) {
BT_ASSERT(stream->ctf_stream_class_id ==
}
stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
- stream->current_packet_end_timestamp =
- index->ts_cycles.timestamp_end;
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
BT_LOGD("get_next_index: new metadata needed");
}
if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
BT_LOGD("get_next_index: new streams needed");
- lttng_live_need_new_streams(lttng_live);
+ lttng_live_need_new_streams(lttng_live_msg_iter);
}
break;
}
case LTTNG_VIEWER_INDEX_RETRY:
BT_LOGD("get_next_index: retry");
memset(index, 0, sizeof(struct packet_index));
- retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
goto end;
case LTTNG_VIEWER_INDEX_HUP:
BT_LOGD("get_next_index: stream hung up");
memset(index, 0, sizeof(struct packet_index));
index->offset = EOF;
- retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ retstatus = LTTNG_LIVE_ITERATOR_STATUS_END;
stream->state = LTTNG_LIVE_STREAM_EOF;
break;
case LTTNG_VIEWER_INDEX_ERR:
error:
if (lttng_live_is_canceled(lttng_live)) {
- retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
return retstatus;
}
BT_HIDDEN
-enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
- struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
- uint64_t req_len, uint64_t *recv_len)
+enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
+ struct lttng_live_stream_iterator *stream, uint8_t *buf,
+ uint64_t offset, uint64_t req_len, uint64_t *recv_len)
{
enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_trace_packet rp;
ssize_t ret_len;
uint32_t flags, status;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ struct lttng_live_component *lttng_live =
+ lttng_live_msg_iter->lttng_live_comp;
BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
}
if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
BT_LOGD("get_data_packet: new streams needed, try again later");
- lttng_live_need_new_streams(lttng_live);
+ lttng_live_need_new_streams(lttng_live_msg_iter);
}
if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
| LTTNG_VIEWER_FLAG_NEW_STREAM)) {
* Request new streams for a session.
*/
BT_HIDDEN
-bt_lttng_live_iterator_status lttng_live_get_new_streams(
+enum lttng_live_iterator_status lttng_live_get_new_streams(
struct lttng_live_session *session)
{
- bt_lttng_live_iterator_status status =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status status =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_new_streams_request rq;
struct lttng_viewer_new_streams_response rp;
ssize_t ret_len;
- struct lttng_live_component *lttng_live = session->lttng_live;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
+ struct lttng_live_component *lttng_live =
+ lttng_live_msg_iter->lttng_live_comp;
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
if (!session->new_streams_needed) {
- return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
+ BT_LOGE("Error sending get_new_streams request: %s",
+ bt_socket_errormsg());
goto error;
}
case LTTNG_VIEWER_NEW_STREAMS_HUP:
session->new_streams_needed = false;
session->closed = true;
- status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ status = LTTNG_LIVE_ITERATOR_STATUS_END;
goto end;
case LTTNG_VIEWER_NEW_STREAMS_ERR:
BT_LOGE("get_new_streams error");
error:
if (lttng_live_is_canceled(lttng_live)) {
- status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
return status;
}
BT_HIDDEN
-struct bt_live_viewer_connection *
- bt_live_viewer_connection_create(const char *url,
- struct lttng_live_component *lttng_live)
+struct live_viewer_connection *live_viewer_connection_create(
+ const char *url, bool in_query,
+ struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- struct bt_live_viewer_connection *viewer_connection;
+ struct live_viewer_connection *viewer_connection;
- viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
+ viewer_connection = g_new0(struct live_viewer_connection, 1);
if (bt_socket_init() != 0) {
goto error;
}
- bt_object_init(&viewer_connection->obj, connection_release);
+ bt_object_init_shared(&viewer_connection->obj, connection_release);
viewer_connection->control_sock = BT_INVALID_SOCKET;
viewer_connection->port = -1;
- viewer_connection->lttng_live = lttng_live;
+ viewer_connection->in_query = in_query;
+ viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
viewer_connection->url = g_string_new(url);
if (!viewer_connection->url) {
goto error;
}
BT_HIDDEN
-void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
+void live_viewer_connection_destroy(
+ struct live_viewer_connection *viewer_connection)
{
BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str);
lttng_live_disconnect_viewer(viewer_connection);
- g_string_free(viewer_connection->url, TRUE);
+ g_string_free(viewer_connection->url, true);
if (viewer_connection->relay_hostname) {
- g_string_free(viewer_connection->relay_hostname, TRUE);
+ g_string_free(viewer_connection->relay_hostname, true);
}
if (viewer_connection->target_hostname) {
- g_string_free(viewer_connection->target_hostname, TRUE);
+ g_string_free(viewer_connection->target_hostname, true);
}
if (viewer_connection->session_name) {
- g_string_free(viewer_connection->session_name, TRUE);
+ g_string_free(viewer_connection->session_name, true);
}
g_free(viewer_connection);