X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Flttng-live.c;h=40a145607216b9bce6b856807abad69f272c677a;hp=9fd35b2a21d15b9a410e741fc2633dc019953794;hb=bb18709be19ebc5b1bd9264cdbd3dd20939bdd05;hpb=1e3400b8572abdb55c6f0416e7c47d13fb715037 diff --git a/plugins/ctf/lttng-live/lttng-live.c b/plugins/ctf/lttng-live/lttng-live.c index 9fd35b2a..40a14560 100644 --- a/plugins/ctf/lttng-live/lttng-live.c +++ b/plugins/ctf/lttng-live/lttng-live.c @@ -3,6 +3,7 @@ * * Babeltrace CTF LTTng-live Client Component * + * Copyright 2019 Francis Deslauriers * Copyright 2016 Jérémie Galarneau * Copyright 2016 Mathieu Desnoyers * @@ -30,24 +31,56 @@ #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC" #include "logging.h" +#include +#include +#include + +#include #include #include #include -#include -#include -#include -#include #include #include "data-stream.h" #include "metadata.h" -#include "lttng-live-internal.h" +#include "lttng-live.h" -#define MAX_QUERY_SIZE (256*1024) +#define MAX_QUERY_SIZE (256*1024) +#define URL_PARAM "url" +#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action" +#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue" +#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail" +#define SESS_NOT_FOUND_ACTION_END_STR "end" #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__) -static const char *print_state(struct lttng_live_stream_iterator *s) +static +const char *print_live_iterator_status(enum lttng_live_iterator_status status) +{ + switch (status) { + case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: + return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE"; + case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN"; + case LTTNG_LIVE_ITERATOR_STATUS_END: + return "LTTNG_LIVE_ITERATOR_STATUS_END"; + case LTTNG_LIVE_ITERATOR_STATUS_OK: + return "LTTNG_LIVE_ITERATOR_STATUS_OK"; + case LTTNG_LIVE_ITERATOR_STATUS_INVAL: + return "LTTNG_LIVE_ITERATOR_STATUS_INVAL"; + case LTTNG_LIVE_ITERATOR_STATUS_ERROR: + return "LTTNG_LIVE_ITERATOR_STATUS_ERROR"; + case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: + return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM"; + case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: + return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED"; + default: + abort(); + } +} + +static +const char *print_state(struct lttng_live_stream_iterator *s) { switch (s->state) { case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: @@ -65,159 +98,68 @@ static const char *print_state(struct lttng_live_stream_iterator *s) } } -static -void print_stream_state(struct lttng_live_stream_iterator *stream) -{ - const bt_port *port; - - port = bt_port_from_private(stream->port); - print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, - bt_port_get_name(port), - print_state(stream), - stream->last_returned_inactivity_timestamp, - stream->current_inactivity_timestamp); - bt_port_put_ref(port); -} +#define print_stream_state(live_stream_iter) \ + do { \ + BT_LOGD("stream state %s last_inact_ts %" PRId64 \ + ", curr_inact_ts %" PRId64, \ + print_state(live_stream_iter), \ + live_stream_iter->last_inactivity_ts, \ + live_stream_iter->current_inactivity_ts); \ + } while (0); BT_HIDDEN -bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live) +bool lttng_live_is_canceled(struct lttng_live_component *lttng_live) { - bt_component *component; - const bt_graph *graph; - bt_bool ret; + const bt_component *component; + bool ret; if (!lttng_live) { - return BT_FALSE; - } - - component = bt_component_from_private(lttng_live->private_component); - graph = bt_component_get_graph(component); - ret = bt_graph_is_canceled(graph); - bt_graph_put_ref(graph); - bt_component_put_ref(component); - return ret; -} - -BT_HIDDEN -int lttng_live_add_port(struct lttng_live_component *lttng_live, - struct lttng_live_stream_iterator *stream_iter) -{ - int ret; - struct bt_private_port *private_port; - char name[STREAM_NAME_MAX_LEN]; - bt_component_status status; - - ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id); - BT_ASSERT(ret > 0); - strcpy(stream_iter->name, name); - if (lttng_live_is_canceled(lttng_live)) { - return 0; - } - status = bt_self_component_source_add_output_port( - lttng_live->private_component, name, stream_iter, - &private_port); - switch (status) { - case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED: - return 0; - case BT_COMPONENT_STATUS_OK: - break; - default: - return -1; - } - bt_object_put_ref(private_port); /* weak */ - BT_LOGI("Added port %s", name); - - if (lttng_live->no_stream_port) { - bt_object_get_ref(lttng_live->no_stream_port); - ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); - bt_object_put_ref(lttng_live->no_stream_port); - if (ret) { - return -1; - } - lttng_live->no_stream_port = NULL; - lttng_live->no_stream_iter->port = NULL; + ret = false; + goto end; } - stream_iter->port = private_port; - return 0; -} -BT_HIDDEN -int lttng_live_remove_port(struct lttng_live_component *lttng_live, - struct bt_private_port *port) -{ - bt_component *component; - int64_t nr_ports; - int ret; - - component = bt_component_from_private(lttng_live->private_component); - nr_ports = bt_component_source_get_output_port_count(component); - if (nr_ports < 0) { - return -1; - } - BT_COMPONENT_PUT_REF_AND_RESET(component); - if (nr_ports == 1) { - bt_component_status status; + component = bt_component_source_as_component_const( + bt_self_component_source_as_component_source( + lttng_live->self_comp)); - BT_ASSERT(!lttng_live->no_stream_port); + ret = bt_component_graph_is_canceled(component); - if (lttng_live_is_canceled(lttng_live)) { - return 0; - } - status = bt_self_component_source_add_output_port(lttng_live->private_component, - "no-stream", lttng_live->no_stream_iter, - <tng_live->no_stream_port); - switch (status) { - case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED: - return 0; - case BT_COMPONENT_STATUS_OK: - break; - default: - return -1; - } - bt_object_put_ref(lttng_live->no_stream_port); /* weak */ - lttng_live->no_stream_iter->port = lttng_live->no_stream_port; - } - bt_object_get_ref(port); - ret = bt_private_port_remove_from_component(port); - bt_object_put_ref(port); - if (ret) { - return -1; - } - return 0; +end: + return ret; } static struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session, uint64_t trace_id) { - struct lttng_live_trace *trace; + uint64_t trace_idx; + struct lttng_live_trace *ret_trace = NULL; - bt_list_for_each_entry(trace, &session->traces, node) { + for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { + struct lttng_live_trace *trace = + g_ptr_array_index(session->traces, trace_idx); if (trace->id == trace_id) { - return trace; + ret_trace = trace; + goto end; } } - return NULL; + +end: + return ret_trace; } static -void lttng_live_destroy_trace(bt_object *obj) +void lttng_live_destroy_trace(struct lttng_live_trace *trace) { - struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj); + BT_LOGD("Destroy lttng_live_trace"); - BT_LOGI("Destroy trace"); - BT_ASSERT(bt_list_empty(&trace->streams)); - bt_list_del(&trace->node); + BT_ASSERT(trace->stream_iterators); + g_ptr_array_free(trace->stream_iterators, TRUE); - if (trace->trace) { - int retval; + BT_TRACE_PUT_REF_AND_RESET(trace->trace); + BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class); - retval = bt_trace_set_is_static(trace->trace); - BT_ASSERT(!retval); - BT_TRACE_PUT_REF_AND_RESET(trace->trace); - } lttng_live_metadata_fini(trace); - BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map); g_free(trace); } @@ -233,10 +175,14 @@ struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *sess } trace->session = session; trace->id = trace_id; - BT_INIT_LIST_HEAD(&trace->streams); + trace->trace_class = NULL; + trace->trace = NULL; + trace->stream_iterators = g_ptr_array_new_with_free_func( + (GDestroyNotify) lttng_live_stream_iterator_destroy); + BT_ASSERT(trace->stream_iterators); trace->new_metadata_needed = true; - bt_list_add(&trace->node, &session->traces); - bt_object_init(&trace->obj, lttng_live_destroy_trace); + g_ptr_array_add(session->traces, trace); + BT_LOGI("Create trace"); goto end; error: @@ -247,63 +193,55 @@ end: } BT_HIDDEN -struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session, - uint64_t trace_id) +struct lttng_live_trace *lttng_live_borrow_trace( + struct lttng_live_session *session, uint64_t trace_id) { struct lttng_live_trace *trace; trace = lttng_live_find_trace(session, trace_id); if (trace) { - bt_object_get_ref(trace); - return trace; + goto end; } - return lttng_live_create_trace(session, trace_id); -} - -BT_HIDDEN -void lttng_live_unref_trace(struct lttng_live_trace *trace) -{ - bt_object_put_ref(trace); -} -static -void lttng_live_close_trace_streams(struct lttng_live_trace *trace) -{ - struct lttng_live_stream_iterator *stream, *s; + /* The session is the owner of the newly created trace. */ + trace = lttng_live_create_trace(session, trace_id); - bt_list_for_each_entry_safe(stream, s, &trace->streams, node) { - lttng_live_stream_iterator_destroy(stream); - } - lttng_live_metadata_fini(trace); +end: + return trace; } BT_HIDDEN -int lttng_live_add_session(struct lttng_live_component *lttng_live, +int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id, const char *hostname, const char *session_name) { int ret = 0; - struct lttng_live_session *s; + struct lttng_live_session *session; - s = g_new0(struct lttng_live_session, 1); - if (!s) { + session = g_new0(struct lttng_live_session, 1); + if (!session) { goto error; } - s->id = session_id; - BT_INIT_LIST_HEAD(&s->traces); - s->lttng_live = lttng_live; - s->new_streams_needed = true; - s->hostname = g_string_new(hostname); - s->session_name = g_string_new(session_name); + session->id = session_id; + session->traces = g_ptr_array_new_with_free_func( + (GDestroyNotify) lttng_live_destroy_trace); + BT_ASSERT(session->traces); + session->lttng_live_msg_iter = lttng_live_msg_iter; + session->new_streams_needed = true; + session->hostname = g_string_new(hostname); + BT_ASSERT(session->hostname); + + session->session_name = g_string_new(session_name); + BT_ASSERT(session->session_name); BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s", - s->id, hostname, session_name); - bt_list_add(&s->node, <tng_live->sessions); + session->id, hostname, session_name); + g_ptr_array_add(lttng_live_msg_iter->sessions, session); goto end; error: BT_LOGE("Error adding session"); - g_free(s); + g_free(session); ret = -1; end: return ret; @@ -312,23 +250,30 @@ end: static void lttng_live_destroy_session(struct lttng_live_session *session) { - struct lttng_live_trace *trace, *t; + struct lttng_live_component *live_comp; + + if (!session) { + goto end; + } - BT_LOGI("Destroy session"); + BT_LOGD("Destroy lttng live session"); if (session->id != -1ULL) { if (lttng_live_detach_session(session)) { - if (!lttng_live_is_canceled(session->lttng_live)) { + live_comp = session->lttng_live_msg_iter->lttng_live_comp; + if (session->lttng_live_msg_iter && + !lttng_live_is_canceled(live_comp)) { /* Old relayd cannot detach sessions. */ - BT_LOGD("Unable to detach session %" PRIu64, + BT_LOGD("Unable to detach lttng live session %" PRIu64, session->id); } } session->id = -1ULL; } - bt_list_for_each_entry_safe(trace, t, &session->traces, node) { - lttng_live_close_trace_streams(trace); + + if (session->traces) { + g_ptr_array_free(session->traces, TRUE); } - bt_list_del(&session->node); + if (session->hostname) { g_string_free(session->hostname, TRUE); } @@ -336,34 +281,50 @@ void lttng_live_destroy_session(struct lttng_live_session *session) g_string_free(session->session_name, TRUE); } g_free(session); + +end: + return; } -BT_HIDDEN -void lttng_live_iterator_finalize(bt_self_message_iterator *it) +static +void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter) { - struct lttng_live_stream_iterator_generic *s = - bt_self_message_iterator_get_user_data(it); - - switch (s->type) { - case LIVE_STREAM_TYPE_NO_STREAM: - { - /* Leave no_stream_iter in place when port is removed. */ - break; + if (!lttng_live_msg_iter) { + goto end; } - case LIVE_STREAM_TYPE_STREAM: - { - struct lttng_live_stream_iterator *stream_iter = - container_of(s, struct lttng_live_stream_iterator, p); - lttng_live_stream_iterator_destroy(stream_iter); - break; - } + if (lttng_live_msg_iter->sessions) { + g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE); } + + BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection); + BT_ASSERT(lttng_live_msg_iter->lttng_live_comp); + BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter); + + /* All stream iterators must be destroyed at this point. */ + BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0); + lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false; + + g_free(lttng_live_msg_iter); + +end: + return; +} + +BT_HIDDEN +void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter) +{ + struct lttng_live_msg_iter *lttng_live_msg_iter; + + BT_ASSERT(self_msg_iter); + + lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); + BT_ASSERT(lttng_live_msg_iter); + lttng_live_msg_iter_destroy(lttng_live_msg_iter); } static -bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( - struct lttng_live_component *lttng_live, +enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( struct lttng_live_stream_iterator *lttng_live_stream) { switch (lttng_live_stream->state) { @@ -381,7 +342,7 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( case LTTNG_LIVE_STREAM_EOF: break; } - return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } /* @@ -393,40 +354,43 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( * return EOF. */ static -bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream( - struct lttng_live_component *lttng_live, +enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream( + struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *lttng_live_stream) { - bt_lttng_live_iterator_status ret = - BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status ret = + LTTNG_LIVE_ITERATOR_STATUS_OK; struct packet_index index; enum lttng_live_stream_state orig_state = lttng_live_stream->state; if (lttng_live_stream->trace->new_metadata_needed) { - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } if (lttng_live_stream->trace->session->new_streams_needed) { - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } - if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA - && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) { + if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA && + lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) { goto end; } - ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index); - if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { + ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, + &index); + if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF); if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) { - if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA - && lttng_live_stream->last_returned_inactivity_timestamp == - lttng_live_stream->current_inactivity_timestamp) { - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts, + curr_inact_ts = lttng_live_stream->current_inactivity_ts; + + if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && + last_inact_ts == curr_inact_ts) { + ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; print_stream_state(lttng_live_stream); } else { - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } goto end; } @@ -434,109 +398,138 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream lttng_live_stream->offset = index.offset; lttng_live_stream->len = index.packet_size / CHAR_BIT; end: - if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { - ret = lttng_live_iterator_next_check_stream_state( - lttng_live, lttng_live_stream); + if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) { + ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream); } return ret; } /* - * Creation of the message requires the ctf trace to be created - * beforehand, but the live protocol gives us all streams (including - * metadata) at once. So we split it in three steps: getting streams, - * getting metadata (which creates the ctf trace), and then creating the - * per-stream messages. + * Creation of the message requires the ctf trace class to be created + * beforehand, but the live protocol gives us all streams (including metadata) + * at once. So we split it in three steps: getting streams, getting metadata + * (which creates the ctf trace class), and then creating the per-stream + * messages. */ static -bt_lttng_live_iterator_status lttng_live_get_session( - struct lttng_live_component *lttng_live, +enum lttng_live_iterator_status lttng_live_get_session( + struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_session *session) { - bt_lttng_live_iterator_status status; - struct lttng_live_trace *trace, *t; + enum lttng_live_iterator_status status; + uint64_t trace_idx; + int ret = 0; - if (lttng_live_attach_session(session)) { - if (lttng_live_is_canceled(lttng_live)) { - return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - } else { - return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (!session->attached) { + ret = lttng_live_attach_session(session); + if (ret) { + if (lttng_live_msg_iter && lttng_live_is_canceled( + lttng_live_msg_iter->lttng_live_comp)) { + status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } + goto end; } } + status = lttng_live_get_new_streams(session); - if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK && - status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) { - return status; + if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && + status != LTTNG_LIVE_ITERATOR_STATUS_END) { + goto end; } - bt_list_for_each_entry_safe(trace, t, &session->traces, node) { + for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { + struct lttng_live_trace *trace = + g_ptr_array_index(session->traces, trace_idx); + status = lttng_live_metadata_update(trace); - if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK && - status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) { - return status; + if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && + status != LTTNG_LIVE_ITERATOR_STATUS_END) { + goto end; } } - return lttng_live_lazy_msg_init(session); + status = lttng_live_lazy_msg_init(session); + +end: + return status; } BT_HIDDEN -void lttng_live_need_new_streams(struct lttng_live_component *lttng_live) +void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter) { - struct lttng_live_session *session; + uint64_t session_idx; - bt_list_for_each_entry(session, <tng_live->sessions, node) { + for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; + session_idx++) { + struct lttng_live_session *session = + g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); session->new_streams_needed = true; } } static -void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live) +void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter) { - struct lttng_live_session *session; - - bt_list_for_each_entry(session, <tng_live->sessions, node) { - struct lttng_live_trace *trace; + uint64_t session_idx, trace_idx; + for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; + session_idx++) { + struct lttng_live_session *session = + g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); session->new_streams_needed = true; - bt_list_for_each_entry(trace, &session->traces, node) { + for (trace_idx = 0; trace_idx < session->traces->len; + trace_idx++) { + struct lttng_live_trace *trace = + g_ptr_array_index(session->traces, trace_idx); trace->new_metadata_needed = true; } } } static -bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata( - struct lttng_live_component *lttng_live) +enum lttng_live_iterator_status +lttng_live_iterator_handle_new_streams_and_metadata( + struct lttng_live_msg_iter *lttng_live_msg_iter) { - bt_lttng_live_iterator_status ret = - BT_LTTNG_LIVE_ITERATOR_STATUS_OK; - unsigned int nr_sessions_opened = 0; - struct lttng_live_session *session, *s; - - bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) { - if (session->closed && bt_list_empty(&session->traces)) { - lttng_live_destroy_session(session); - } - } + enum lttng_live_iterator_status ret = + LTTNG_LIVE_ITERATOR_STATUS_OK; + uint64_t session_idx = 0, nr_sessions_opened = 0; + struct lttng_live_session *session; + enum session_not_found_action sess_not_found_act = + lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act; + /* - * Currently, when there are no sessions, we quit immediately. - * We may want to add a component parameter to keep trying until - * we get data in the future. - * Also, in a remotely distant future, we could add a "new + * In a remotely distant future, we could add a "new * session" flag to the protocol, which would tell us that we * need to query for new sessions even though we have sessions * currently ongoing. */ - if (bt_list_empty(<tng_live->sessions)) { - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; - goto end; + if (lttng_live_msg_iter->sessions->len == 0) { + if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { + ret = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; + } else { + /* + * Retry to create a viewer session for the requested + * session name. + */ + if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { + ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + } } - bt_list_for_each_entry(session, <tng_live->sessions, node) { - ret = lttng_live_get_session(lttng_live, session); + + for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; + session_idx++) { + session = g_ptr_array_index(lttng_live_msg_iter->sessions, + session_idx); + ret = lttng_live_get_session(lttng_live_msg_iter, session); switch (ret) { - case BT_LTTNG_LIVE_ITERATOR_STATUS_OK: + case LTTNG_LIVE_ITERATOR_STATUS_OK: break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_END: - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + case LTTNG_LIVE_ITERATOR_STATUS_END: + ret = LTTNG_LIVE_ITERATOR_STATUS_OK; break; default: goto end; @@ -546,147 +539,257 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_me } } end: - if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) { - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; + if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK && + sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && + nr_sessions_opened == 0) { + ret = LTTNG_LIVE_ITERATOR_STATUS_END; } return ret; } static -bt_lttng_live_iterator_status emit_inactivity_message( - struct lttng_live_component *lttng_live, - struct lttng_live_stream_iterator *lttng_live_stream, - const bt_message **message, - uint64_t timestamp) +enum lttng_live_iterator_status emit_inactivity_message( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + bt_message **message, uint64_t timestamp) { - bt_lttng_live_iterator_status ret = - BT_LTTNG_LIVE_ITERATOR_STATUS_OK; - struct lttng_live_trace *trace; - const bt_clock_class *clock_class = NULL; - bt_clock_snapshot *clock_snapshot = NULL; - const bt_message *msg = NULL; - int retval; + enum lttng_live_iterator_status ret = + LTTNG_LIVE_ITERATOR_STATUS_OK; + bt_message *msg = NULL; - trace = lttng_live_stream->trace; - if (!trace) { - goto error; - } - clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0); - if (!clock_class) { - goto error; - } - clock_snapshot = bt_clock_snapshot_create(clock_class, timestamp); - if (!clock_snapshot) { - goto error; - } - msg = bt_message_inactivity_create(trace->cc_prio_map); + BT_ASSERT(stream_iter->trace->clock_class); + + msg = bt_message_message_iterator_inactivity_create( + lttng_live_msg_iter->self_msg_iter, + stream_iter->trace->clock_class, + timestamp); if (!msg) { goto error; } - retval = bt_message_inactivity_set_clock_snapshot(msg, clock_snapshot); - if (retval) { - goto error; - } + *message = msg; end: - bt_object_put_ref(clock_snapshot); - bt_clock_class_put_ref(clock_class); return ret; error: - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; bt_message_put_ref(msg); goto end; } static -bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( - struct lttng_live_component *lttng_live, +enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( + struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *lttng_live_stream, - const bt_message **message) + bt_message **message) { - bt_lttng_live_iterator_status ret = - BT_LTTNG_LIVE_ITERATOR_STATUS_OK; - const bt_clock_class *clock_class = NULL; - bt_clock_snapshot *clock_snapshot = NULL; + enum lttng_live_iterator_status ret = + LTTNG_LIVE_ITERATOR_STATUS_OK; if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) { - return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } - if (lttng_live_stream->current_inactivity_timestamp == - lttng_live_stream->last_returned_inactivity_timestamp) { + if (lttng_live_stream->current_inactivity_ts == + lttng_live_stream->last_inactivity_ts) { lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA; - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + goto end; + } + + ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, + message, lttng_live_stream->current_inactivity_ts); + + lttng_live_stream->last_inactivity_ts = + lttng_live_stream->current_inactivity_ts; +end: + return ret; +} + +static +int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, + struct lttng_live_msg_iter *lttng_live_msg_iter, + const bt_message *msg, int64_t last_msg_ts_ns, + int64_t *ts_ns) +{ + const bt_clock_class *clock_class = NULL; + const bt_clock_snapshot *clock_snapshot = NULL; + int ret = 0; + bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN; + bt_message_stream_activity_clock_snapshot_state sa_cs_state; + + BT_ASSERT(msg); + BT_ASSERT(ts_ns); + + BT_LOGV("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, " + "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg, + last_msg_ts_ns); + + switch (bt_message_get_type(msg)) { + case BT_MESSAGE_TYPE_EVENT: + clock_class = + bt_message_event_borrow_stream_class_default_clock_class_const( + msg); + BT_ASSERT(clock_class); + + cs_state = bt_message_event_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_PACKET_BEGINNING: + clock_class = + bt_message_packet_beginning_borrow_stream_class_default_clock_class_const( + msg); + BT_ASSERT(clock_class); + + cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_PACKET_END: + clock_class = + bt_message_packet_end_borrow_stream_class_default_clock_class_const( + msg); + BT_ASSERT(clock_class); + + cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + clock_class = + bt_message_discarded_events_borrow_stream_class_default_clock_class_const( + msg); + BT_ASSERT(clock_class); + + cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + clock_class = + bt_message_discarded_packets_borrow_stream_class_default_clock_class_const( + msg); + BT_ASSERT(clock_class); + + cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const( + msg, &clock_snapshot); + break; + case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: + clock_class = + bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( + msg); + BT_ASSERT(clock_class); + + sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { + goto no_clock_snapshot; + } + + break; + case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: + clock_class = + bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( + msg); + BT_ASSERT(clock_class); + + sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { + goto no_clock_snapshot; + } + + break; + case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: + cs_state = + bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + msg, &clock_snapshot); + break; + default: + /* All the other messages have a higher priority */ + BT_LOGV_STR("Message has no timestamp: using the last message timestamp."); + *ts_ns = last_msg_ts_ns; goto end; } - ret = emit_inactivity_message(lttng_live, lttng_live_stream, message, - (uint64_t) lttng_live_stream->current_inactivity_timestamp); + if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) { + BT_LOGE_STR("Unsupported unknown clock snapshot."); + ret = -1; + goto end; + } + + clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot); + BT_ASSERT(clock_class); + + ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); + if (ret) { + BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " + "clock-snapshot-addr=%p", clock_snapshot); + goto error; + } + + goto end; + +no_clock_snapshot: + BT_LOGV_STR("Message's default clock snapshot is missing: " + "using the last message timestamp."); + *ts_ns = last_msg_ts_ns; + goto end; + +error: + ret = -1; - lttng_live_stream->last_returned_inactivity_timestamp = - lttng_live_stream->current_inactivity_timestamp; end: - bt_object_put_ref(clock_snapshot); - bt_clock_class_put_ref(clock_class); + if (ret == 0) { + BT_LOGV("Found message's timestamp: " + "iter-data-addr=%p, msg-addr=%p, " + "last-msg-ts=%" PRId64 ", ts=%" PRId64, + lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns); + } + return ret; } static -bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( - struct lttng_live_component *lttng_live, +enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( + struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *lttng_live_stream, - const bt_message **message) + bt_message **message) { - bt_lttng_live_iterator_status ret = - BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; enum bt_msg_iter_status status; - struct lttng_live_session *session; + uint64_t session_idx, trace_idx; - bt_list_for_each_entry(session, <tng_live->sessions, node) { - struct lttng_live_trace *trace; + for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; + session_idx++) { + struct lttng_live_session *session = + g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); if (session->new_streams_needed) { - return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + goto end; } - bt_list_for_each_entry(trace, &session->traces, node) { + for (trace_idx = 0; trace_idx < session->traces->len; + trace_idx++) { + struct lttng_live_trace *trace = + g_ptr_array_index(session->traces, trace_idx); if (trace->new_metadata_needed) { - return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + goto end; } } } if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) { - return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } - if (lttng_live_stream->packet_end_msg_queue) { - *message = lttng_live_stream->packet_end_msg_queue; - lttng_live_stream->packet_end_msg_queue = NULL; - status = BT_MSG_ITER_STATUS_OK; - } else { - status = bt_msg_iter_get_next_message( - lttng_live_stream->msg_iter, - lttng_live_stream->trace->cc_prio_map, - message); - if (status == BT_MSG_ITER_STATUS_OK) { - /* - * Consider empty packets as inactivity. - */ - if (bt_message_get_type(*message) == BT_MESSAGE_TYPE_PACKET_END) { - lttng_live_stream->packet_end_msg_queue = *message; - *message = NULL; - return emit_inactivity_message(lttng_live, - lttng_live_stream, message, - lttng_live_stream->current_packet_end_timestamp); - } - } + ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; } + + status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter, + lttng_live_msg_iter->self_msg_iter, message); switch (status) { case BT_MSG_ITER_STATUS_EOF: - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; + ret = LTTNG_LIVE_ITERATOR_STATUS_END; break; case BT_MSG_ITER_STATUS_OK: - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + ret = LTTNG_LIVE_ITERATOR_STATUS_OK; break; case BT_MSG_ITER_STATUS_AGAIN: /* @@ -694,15 +797,19 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_st * get_index may return AGAIN to delay the following * attempt. */ - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; break; case BT_MSG_ITER_STATUS_INVAL: /* No argument provided by the user, so don't return INVAL. */ case BT_MSG_ITER_STATUS_ERROR: default: - ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p", + lttng_live_stream->msg_iter); break; } + +end: return ret; } @@ -755,303 +862,626 @@ bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_st * When disconnected from relayd: try to re-connect endlessly. */ static -bt_message_iterator_next_method_return lttng_live_iterator_next_stream( - bt_self_message_iterator *iterator, - struct lttng_live_stream_iterator *stream_iter) +enum lttng_live_iterator_status lttng_live_iterator_next_on_stream( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + bt_message **curr_msg) { - bt_lttng_live_iterator_status status; - bt_message_iterator_next_method_return next_return; - struct lttng_live_component *lttng_live; + enum lttng_live_iterator_status live_status; - lttng_live = stream_iter->trace->session->lttng_live; retry: print_stream_state(stream_iter); - next_return.message = NULL; - status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); - if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { + live_status = lttng_live_iterator_handle_new_streams_and_metadata( + lttng_live_msg_iter); + if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } - status = lttng_live_iterator_next_handle_one_no_data_stream( - lttng_live, stream_iter); - if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { + live_status = lttng_live_iterator_next_handle_one_no_data_stream( + lttng_live_msg_iter, stream_iter); + if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } - status = lttng_live_iterator_next_handle_one_quiescent_stream( - lttng_live, stream_iter, &next_return.message); - if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(next_return.message == NULL); + live_status = lttng_live_iterator_next_handle_one_quiescent_stream( + lttng_live_msg_iter, stream_iter, curr_msg); + if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + BT_ASSERT(*curr_msg == NULL); goto end; } - if (next_return.message) { + if (*curr_msg) { goto end; } - status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live, - stream_iter, &next_return.message); - if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(next_return.message == NULL); + live_status = lttng_live_iterator_next_handle_one_active_data_stream( + lttng_live_msg_iter, stream_iter, curr_msg); + if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + BT_ASSERT(*curr_msg == NULL); } end: - switch (status) { - case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: - print_dbg("continue"); + if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) { goto retry; - case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN; - print_dbg("again"); - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_END: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_END; - print_dbg("end"); - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_OK: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_OK; - print_dbg("ok"); - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID; - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED; - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR: - default: /* fall-through */ - next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR; - break; } - return next_return; + + return live_status; } static -bt_message_iterator_next_method_return lttng_live_iterator_next_no_stream( - bt_self_message_iterator *iterator, - struct lttng_live_no_stream_iterator *no_stream_iter) +enum lttng_live_iterator_status next_stream_iterator_for_trace( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_trace *live_trace, + struct lttng_live_stream_iterator **candidate_stream_iter) { - bt_lttng_live_iterator_status status; - bt_message_iterator_next_method_return next_return; - struct lttng_live_component *lttng_live; + struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL; + enum lttng_live_iterator_status stream_iter_status;; + int64_t curr_candidate_msg_ts = INT64_MAX; + uint64_t stream_iter_idx; - lttng_live = no_stream_iter->lttng_live; -retry: - lttng_live_force_new_streams_and_metadata(lttng_live); - next_return.message = NULL; - status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); - if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { + BT_ASSERT(live_trace); + BT_ASSERT(live_trace->stream_iterators); + /* + * Update the current message of every stream iterators of this trace. + * The current msg of every stream must have a timestamp equal or + * larger than the last message returned by this iterator. We must + * ensure monotonicity. + */ + stream_iter_idx = 0; + while (stream_iter_idx < live_trace->stream_iterators->len) { + bool stream_iter_is_ended = false; + struct lttng_live_stream_iterator *stream_iter = + g_ptr_array_index(live_trace->stream_iterators, + stream_iter_idx); + + /* + * Find if there is are now current message for this stream + * iterator get it. + */ + while (!stream_iter->current_msg) { + bt_message *msg = NULL; + int64_t curr_msg_ts_ns = INT64_MAX; + stream_iter_status = lttng_live_iterator_next_on_stream( + lttng_live_msg_iter, stream_iter, &msg); + + BT_LOGD("live stream iterator returned status :%s", + print_live_iterator_status(stream_iter_status)); + if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { + stream_iter_is_ended = true; + break; + } + + if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto end; + } + + BT_ASSERT(msg); + + /* + * Get the timestamp in nanoseconds from origin of this + * messsage. + */ + live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, + msg, lttng_live_msg_iter->last_msg_ts_ns, + &curr_msg_ts_ns); + + /* + * Check if the message of the current live stream + * iterator occured at the exact same time or after the + * last message returned by this component's message + * iterator. If not, we return an error. + */ + if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) { + stream_iter->current_msg = msg; + stream_iter->current_msg_ts_ns = curr_msg_ts_ns; + } else { + /* + * We received a message in the past. To ensure + * monotonicity, we can't send it forward. + */ + BT_LOGE("Message's timestamp is less than " + "lttng-live's message iterator's last " + "returned timestamp: " + "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " + "last-msg-ts=%" PRId64, + lttng_live_msg_iter, curr_msg_ts_ns, + lttng_live_msg_iter->last_msg_ts_ns); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + } + + if (!stream_iter_is_ended && + stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { + /* + * Update the current best candidate message for the + * stream iterator of thise live trace to be forwarded + * downstream. + */ + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } + + if (stream_iter_is_ended) { + /* + * The live stream iterator is ENDed. We remove that + * iterator from the list and we restart the iteration + * at the beginning of the live stream iterator array + * to because the removal will shuffle the array. + */ + g_ptr_array_remove_index_fast(live_trace->stream_iterators, + stream_iter_idx); + stream_iter_idx = 0; + } else { + stream_iter_idx++; + } + } + + if (curr_candidate_stream_iter) { + *candidate_stream_iter = curr_candidate_stream_iter; + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + } else { + /* + * The only case where we don't have a candidate for this trace + * is if we reached the end of all the iterators. + */ + BT_ASSERT(live_trace->stream_iterators->len == 0); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END; + } + +end: + return stream_iter_status; +} + +static +enum lttng_live_iterator_status next_stream_iterator_for_session( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_session *session, + struct lttng_live_stream_iterator **candidate_session_stream_iter) +{ + enum lttng_live_iterator_status stream_iter_status; + uint64_t trace_idx = 0; + int64_t curr_candidate_msg_ts = INT64_MAX; + struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL; + + /* + * Make sure we are attached to the session and look for new streams + * and metadata. + */ + stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session); + if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK && + stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE && + stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) { goto end; } - if (no_stream_iter->port) { - status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + + BT_ASSERT(session->traces); + + /* + * Use while loops here rather then for loops so we can restart the + * iteration if an element is removed from the array during the + * looping. + */ + while (trace_idx < session->traces->len) { + bool trace_is_ended = false; + struct lttng_live_stream_iterator *stream_iter; + struct lttng_live_trace *trace = + g_ptr_array_index(session->traces, trace_idx); + + stream_iter_status = next_stream_iterator_for_trace( + lttng_live_msg_iter, trace, &stream_iter); + if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { + /* + * All the live stream iterators for this trace are + * ENDed. Remove the trace from this session. + */ + trace_is_ended = true; + } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto end; + } + + if (!trace_is_ended) { + BT_ASSERT(stream_iter); + + if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { + curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; + curr_candidate_stream_iter = stream_iter; + } + trace_idx++; + } else { + g_ptr_array_remove_index_fast(session->traces, trace_idx); + trace_idx = 0; + } + } + if (curr_candidate_stream_iter) { + *candidate_session_stream_iter = curr_candidate_stream_iter; + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } else { - status = BT_LTTNG_LIVE_ITERATOR_STATUS_END; + /* + * The only cases where we don't have a candidate for this + * trace is: + * 1. if we reached the end of all the iterators of all the + * traces of this session, + * 2. if we never had live stream iterator in the first place. + * + * In either cases, we return END. + */ + BT_ASSERT(session->traces->len == 0); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END; } end: - switch (status) { - case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: - goto retry; - case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN; - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_END: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_END; - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID; - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED; - break; - case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR: - default: /* fall-through */ - next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR; - break; + return stream_iter_status; +} + +static inline +void put_messages(bt_message_array_const msgs, uint64_t count) +{ + uint64_t i; + + for (i = 0; i < count; i++) { + BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]); } - return next_return; } BT_HIDDEN -bt_message_iterator_next_method_return lttng_live_iterator_next( - bt_self_message_iterator *iterator) +bt_self_message_iterator_status lttng_live_msg_iter_next( + bt_self_message_iterator *self_msg_it, + bt_message_array_const msgs, uint64_t capacity, + uint64_t *count) { - struct lttng_live_stream_iterator_generic *s = - bt_self_message_iterator_get_user_data(iterator); - bt_message_iterator_next_method_return next_return; - - switch (s->type) { - case LIVE_STREAM_TYPE_NO_STREAM: - next_return = lttng_live_iterator_next_no_stream(iterator, - container_of(s, struct lttng_live_no_stream_iterator, p)); + bt_self_message_iterator_status status; + struct lttng_live_msg_iter *lttng_live_msg_iter = + bt_self_message_iterator_get_data(self_msg_it); + struct lttng_live_component *lttng_live = + lttng_live_msg_iter->lttng_live_comp; + enum lttng_live_iterator_status stream_iter_status; + uint64_t session_idx; + + *count = 0; + + BT_ASSERT(lttng_live_msg_iter); + + /* + * Clear all the invalid message reference that might be left over in + * the output array. + */ + memset(msgs, 0, capacity * sizeof(*msgs)); + + /* + * If no session are exposed on the relay found at the url provided by + * the user, session count will be 0. In this case, we return status + * end to return gracefully. + */ + if (lttng_live_msg_iter->sessions->len == 0) { + if (lttng_live->params.sess_not_found_act != + SESSION_NOT_FOUND_ACTION_CONTINUE) { + status = BT_SELF_MESSAGE_ITERATOR_STATUS_END; + goto no_session; + } else { + /* + * The are no more active session for this session + * name. Retry to create a viewer session for the + * requested session name. + */ + if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto no_session; + } + } + } + + if (lttng_live_msg_iter->active_stream_iter == 0) { + lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter); + } + + /* + * Here the muxing of message is done. + * + * We need to iterate over all the streams of all the traces of all the + * viewer sessions in order to get the message with the smallest + * timestamp. In this case, a session is a viewer session and there is + * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or + * kernel). Each viewer session can have multiple traces, for example, + * 64bit UST viewer sessions could have multiple per-pid traces. + * + * We iterate over the streams of each traces to update and see what is + * their next message's timestamp. From those timestamps, we select the + * message with the smallest timestamp as the best candidate message + * for that trace and do the same thing across all the sessions. + * + * We then compare the timestamp of best candidate message of all the + * sessions to pick the message with the smallest timestamp and we + * return it. + */ + while (*count < capacity) { + struct lttng_live_stream_iterator *next_stream_iter = NULL, + *candidate_stream_iter = NULL; + int64_t next_msg_ts_ns = INT64_MAX; + + BT_ASSERT(lttng_live_msg_iter->sessions); + session_idx = 0; + /* + * Use a while loop instead of a for loop so we can restart the + * iteration if we remove an element. We can safely call + * next_stream_iterator_for_session() multiple times on the + * same session as we only fetch a new message if there is no + * current next message for each live stream iterator. + * If all live stream iterator of that session already have a + * current next message, the function will simply exit return + * the same candidate live stream iterator every time. + */ + while (session_idx < lttng_live_msg_iter->sessions->len) { + struct lttng_live_session *session = + g_ptr_array_index(lttng_live_msg_iter->sessions, + session_idx); + + /* Find the best candidate message to send downstream. */ + stream_iter_status = next_stream_iterator_for_session( + lttng_live_msg_iter, session, + &candidate_stream_iter); + + /* If we receive an END status, it means that either: + * - Those traces never had active streams (UST with no + * data produced yet), + * - All live stream iterators have ENDed.*/ + if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { + if (session->closed && session->traces->len == 0) { + /* + * Remove the session from the list and restart the + * iteration at the beginning of the array since the + * removal shuffle the elements of the array. + */ + g_ptr_array_remove_index_fast( + lttng_live_msg_iter->sessions, + session_idx); + session_idx = 0; + } else { + session_idx++; + } + continue; + } + + if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto end; + } + + if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) { + next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; + next_stream_iter = candidate_stream_iter; + } + + session_idx++; + } + + if (!next_stream_iter) { + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + goto end; + } + + BT_ASSERT(next_stream_iter->current_msg); + /* Ensure monotonicity. */ + BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <= + next_stream_iter->current_msg_ts_ns); + + /* + * Insert the next message to the message batch. This will set + * stream iterator current messsage to NULL so that next time + * we fetch the next message of that stream iterator + */ + BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg); + (*count)++; + + /* Update the last timestamp in nanoseconds sent downstream. */ + lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns; + next_stream_iter->current_msg_ts_ns = INT64_MAX; + + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + } +end: + switch (stream_iter_status) { + case LTTNG_LIVE_ITERATOR_STATUS_OK: + case LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + if (*count > 0) { + /* + * We received a again status but we have some messages + * to send downstream. We send them and return OK for + * now. On the next call we return again if there are + * still no new message to send. + */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + } else { + status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN; + } break; - case LIVE_STREAM_TYPE_STREAM: - next_return = lttng_live_iterator_next_stream(iterator, - container_of(s, struct lttng_live_stream_iterator, p)); + case LTTNG_LIVE_ITERATOR_STATUS_END: + status = BT_SELF_MESSAGE_ITERATOR_STATUS_END; break; - default: - next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: + status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM; + break; + case LTTNG_LIVE_ITERATOR_STATUS_ERROR: + case LTTNG_LIVE_ITERATOR_STATUS_INVAL: + case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + /* Put all existing messages on error. */ + put_messages(msgs, *count); break; + default: + abort(); } - return next_return; + +no_session: + return status; } BT_HIDDEN -bt_message_iterator_status lttng_live_iterator_init( - bt_self_message_iterator *it, - struct bt_private_port *port) +bt_self_message_iterator_status lttng_live_msg_iter_init( + bt_self_message_iterator *self_msg_it, + bt_self_component_source *self_comp_src, + bt_self_component_port_output *self_port) { - bt_message_iterator_status ret = - BT_MESSAGE_ITERATOR_STATUS_OK; - struct lttng_live_stream_iterator_generic *s; - - BT_ASSERT(it); - - s = bt_private_port_get_user_data(port); - BT_ASSERT(s); - switch (s->type) { - case LIVE_STREAM_TYPE_NO_STREAM: - { - struct lttng_live_no_stream_iterator *no_stream_iter = - container_of(s, struct lttng_live_no_stream_iterator, p); - ret = bt_self_message_iterator_set_user_data(it, no_stream_iter); - if (ret) { - goto error; - } - break; + bt_self_message_iterator_status ret = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + bt_self_component *self_comp = + bt_self_component_source_as_self_component(self_comp_src); + struct lttng_live_component *lttng_live; + struct lttng_live_msg_iter *lttng_live_msg_iter; + + BT_ASSERT(self_msg_it); + + lttng_live = bt_self_component_get_data(self_comp); + + /* There can be only one downstream iterator at the same time. */ + BT_ASSERT(!lttng_live->has_msg_iter); + lttng_live->has_msg_iter = true; + + lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1); + if (!lttng_live_msg_iter) { + ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; } - case LIVE_STREAM_TYPE_STREAM: - { - struct lttng_live_stream_iterator *stream_iter = - container_of(s, struct lttng_live_stream_iterator, p); - ret = bt_self_message_iterator_set_user_data(it, stream_iter); - if (ret) { + + lttng_live_msg_iter->lttng_live_comp = lttng_live; + lttng_live_msg_iter->self_msg_iter = self_msg_it; + + lttng_live_msg_iter->active_stream_iter = 0; + lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN; + lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func( + (GDestroyNotify) lttng_live_destroy_session); + BT_ASSERT(lttng_live_msg_iter->sessions); + + lttng_live_msg_iter->viewer_connection = + live_viewer_connection_create(lttng_live->params.url->str, false, + lttng_live_msg_iter); + if (!lttng_live_msg_iter->viewer_connection) { + goto error; + } + + if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { + goto error; + } + if (lttng_live_msg_iter->sessions->len == 0) { + switch (lttng_live->params.sess_not_found_act) { + case SESSION_NOT_FOUND_ACTION_CONTINUE: + BT_LOGI("Unable to connect to the requested live viewer " + "session. Keep trying to connect because of " + "%s=\"%s\" component parameter: url=\"%s\"", + SESS_NOT_FOUND_ACTION_PARAM, + SESS_NOT_FOUND_ACTION_CONTINUE_STR, + lttng_live->params.url->str); + break; + case SESSION_NOT_FOUND_ACTION_FAIL: + BT_LOGE("Unable to connect to the requested live viewer " + "session. Fail the message iterator" + "initialization because of %s=\"%s\" " + "component parameter: url =\"%s\"", + SESS_NOT_FOUND_ACTION_PARAM, + SESS_NOT_FOUND_ACTION_FAIL_STR, + lttng_live->params.url->str); goto error; + case SESSION_NOT_FOUND_ACTION_END: + BT_LOGI("Unable to connect to the requested live viewer " + "session. End gracefully at the first _next() " + "call because of %s=\"%s\" component parameter: " + "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, + SESS_NOT_FOUND_ACTION_END_STR, + lttng_live->params.url->str); + break; } - break; - } - default: - ret = BT_MESSAGE_ITERATOR_STATUS_ERROR; - goto end; } + bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter); + + goto end; +error: + ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + lttng_live_msg_iter_destroy(lttng_live_msg_iter); end: return ret; -error: - if (bt_self_message_iterator_set_user_data(it, NULL) - != BT_MESSAGE_ITERATOR_STATUS_OK) { - BT_LOGE("Error setting private data to NULL"); - } - goto end; } static -bt_component_class_query_method_return lttng_live_query_list_sessions( - const bt_component_class *comp_class, - const bt_query_executor *query_exec, - bt_value *params) +bt_query_status lttng_live_query_list_sessions(const bt_value *params, + const bt_value **result) { - bt_component_class_query_method_return query_ret = { - .result = NULL, - .status = BT_QUERY_STATUS_OK, - }; - - bt_value *url_value = NULL; + bt_query_status status = BT_QUERY_STATUS_OK; + const bt_value *url_value = NULL; const char *url; - struct bt_live_viewer_connection *viewer_connection = NULL; + struct live_viewer_connection *viewer_connection = NULL; - url_value = bt_value_map_get(params, "url"); - if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) { - BT_LOGW("Mandatory \"url\" parameter missing"); - query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS; + url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); + if (!url_value) { + BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM); + status = BT_QUERY_STATUS_INVALID_PARAMS; goto error; } - if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) { - BT_LOGW("\"url\" parameter is required to be a string value"); - query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS; + if (!bt_value_is_string(url_value)) { + BT_LOGW("`%s` parameter is required to be a string value", + URL_PARAM); + status = BT_QUERY_STATUS_INVALID_PARAMS; goto error; } - viewer_connection = bt_live_viewer_connection_create(url, NULL); + url = bt_value_string_get(url_value); + + viewer_connection = live_viewer_connection_create(url, true, NULL); if (!viewer_connection) { goto error; } - query_ret.result = - bt_live_viewer_connection_list_sessions(viewer_connection); - if (!query_ret.result) { + status = live_viewer_connection_list_sessions(viewer_connection, + result); + if (status != BT_QUERY_STATUS_OK) { goto error; } goto end; error: - BT_OBJECT_PUT_REF_AND_RESET(query_ret.result); + BT_VALUE_PUT_REF_AND_RESET(*result); - if (query_ret.status >= 0) { - query_ret.status = BT_QUERY_STATUS_ERROR; + if (status >= 0) { + status = BT_QUERY_STATUS_ERROR; } end: if (viewer_connection) { - bt_live_viewer_connection_destroy(viewer_connection); + live_viewer_connection_destroy(viewer_connection); } - BT_VALUE_PUT_REF_AND_RESET(url_value); - return query_ret; + return status; } BT_HIDDEN -bt_component_class_query_method_return lttng_live_query( - const bt_component_class *comp_class, +bt_query_status lttng_live_query(bt_self_component_class_source *comp_class, const bt_query_executor *query_exec, - const char *object, bt_value *params) + const char *object, const bt_value *params, + const bt_value **result) { - bt_component_class_query_method_return ret = { - .result = NULL, - .status = BT_QUERY_STATUS_OK, - }; + bt_query_status status = BT_QUERY_STATUS_OK; if (strcmp(object, "sessions") == 0) { - return lttng_live_query_list_sessions(comp_class, - query_exec, params); + status = lttng_live_query_list_sessions(params, result); + } else { + BT_LOGW("Unknown query object `%s`", object); + status = BT_QUERY_STATUS_INVALID_OBJECT; + goto end; } - BT_LOGW("Unknown query object `%s`", object); - ret.status = BT_QUERY_STATUS_INVALID_OBJECT; - return ret; + +end: + return status; } static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live) { - int ret; - struct lttng_live_session *session, *s; - - bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) { - lttng_live_destroy_session(session); - } - BT_OBJECT_PUT_REF_AND_RESET(lttng_live->viewer_connection); - if (lttng_live->url) { - g_string_free(lttng_live->url, TRUE); - } - if (lttng_live->no_stream_port) { - bt_object_get_ref(lttng_live->no_stream_port); - ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); - bt_object_put_ref(lttng_live->no_stream_port); - BT_ASSERT(!ret); - } - if (lttng_live->no_stream_iter) { - g_free(lttng_live->no_stream_iter); + if (lttng_live->params.url) { + g_string_free(lttng_live->params.url, TRUE); } g_free(lttng_live); } BT_HIDDEN -void lttng_live_component_finalize(bt_self_component *component) +void lttng_live_component_finalize(bt_self_component_source *component) { - void *data = bt_self_component_get_user_data(component); + void *data = bt_self_component_get_data( + bt_self_component_source_as_self_component(component)); if (!data) { return; @@ -1060,41 +1490,71 @@ void lttng_live_component_finalize(bt_self_component *component) } static -struct lttng_live_component *lttng_live_component_create(bt_value *params, - bt_self_component *private_component) +enum session_not_found_action parse_session_not_found_action_param( + const bt_value *no_session_param) +{ + enum session_not_found_action action; + const char *no_session_act_str; + no_session_act_str = bt_value_string_get(no_session_param); + if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) { + action = SESSION_NOT_FOUND_ACTION_CONTINUE; + } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) { + action = SESSION_NOT_FOUND_ACTION_FAIL; + } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) { + action = SESSION_NOT_FOUND_ACTION_END; + } else { + action = -1; + } + + return action; +} + +struct lttng_live_component *lttng_live_component_create(const bt_value *params) { struct lttng_live_component *lttng_live; - bt_value *value = NULL; + const bt_value *value = NULL; const char *url; - bt_value_status ret; lttng_live = g_new0(struct lttng_live_component, 1); if (!lttng_live) { goto end; } - /* TODO: make this an overridable parameter. */ lttng_live->max_query_size = MAX_QUERY_SIZE; - BT_INIT_LIST_HEAD(<tng_live->sessions); - value = bt_value_map_get(params, "url"); - if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) { - BT_LOGW("Mandatory \"url\" parameter missing"); + lttng_live->has_msg_iter = false; + + value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); + if (!value || !bt_value_is_string(value)) { + BT_LOGW("Mandatory `%s` parameter missing or not a string", + URL_PARAM); goto error; } url = bt_value_string_get(value); - lttng_live->url = g_string_new(url); - if (!lttng_live->url) { - goto error; - } - BT_VALUE_PUT_REF_AND_RESET(value); - lttng_live->viewer_connection = - bt_live_viewer_connection_create(lttng_live->url->str, lttng_live); - if (!lttng_live->viewer_connection) { + lttng_live->params.url = g_string_new(url); + if (!lttng_live->params.url) { goto error; } - if (lttng_live_create_viewer_session(lttng_live)) { - goto error; + + value = bt_value_map_borrow_entry_value_const(params, + SESS_NOT_FOUND_ACTION_PARAM); + + if (value && bt_value_is_string(value)) { + lttng_live->params.sess_not_found_act = + parse_session_not_found_action_param(value); + if (lttng_live->params.sess_not_found_act == -1) { + BT_LOGE("Unexpected value for `%s` parameter: " + "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, + bt_value_string_get(value)); + goto error; + } + } else { + BT_LOGW("Optional `%s` parameter is missing or " + "not a string value. Defaulting to %s=\"%s\".", + SESS_NOT_FOUND_ACTION_PARAM, + SESS_NOT_FOUND_ACTION_PARAM, + SESS_NOT_FOUND_ACTION_CONTINUE_STR); + lttng_live->params.sess_not_found_act = + SESSION_NOT_FOUND_ACTION_CONTINUE; } - lttng_live->private_component = private_component; goto end; @@ -1106,84 +1566,35 @@ end: } BT_HIDDEN -bt_component_status lttng_live_component_init( - bt_self_component *private_component, - bt_value *params, void *init_method_data) +bt_self_component_status lttng_live_component_init( + bt_self_component_source *self_comp, + const bt_value *params, UNUSED_VAR void *init_method_data) { struct lttng_live_component *lttng_live; - bt_component_status ret = BT_COMPONENT_STATUS_OK; + bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK; - /* Passes ownership of iter ref to lttng_live_component_create. */ - lttng_live = lttng_live_component_create(params, private_component); + lttng_live = lttng_live_component_create(params); if (!lttng_live) { - //TODO : we need access to the application cancel state - //because we are not part of a graph yet. - ret = BT_COMPONENT_STATUS_NOMEM; + ret = BT_SELF_COMPONENT_STATUS_NOMEM; goto end; } + lttng_live->self_comp = self_comp; - lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1); - lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM; - lttng_live->no_stream_iter->lttng_live = lttng_live; if (lttng_live_is_canceled(lttng_live)) { goto end; } + ret = bt_self_component_source_add_output_port( - lttng_live->private_component, "no-stream", - lttng_live->no_stream_iter, - <tng_live->no_stream_port); - if (ret != BT_COMPONENT_STATUS_OK) { + lttng_live->self_comp, "out", + NULL, NULL); + if (ret != BT_SELF_COMPONENT_STATUS_OK) { goto end; } - bt_object_put_ref(lttng_live->no_stream_port); /* weak */ - lttng_live->no_stream_iter->port = lttng_live->no_stream_port; - ret = bt_self_component_set_user_data(private_component, lttng_live); - if (ret != BT_COMPONENT_STATUS_OK) { - goto error; - } + bt_self_component_set_data( + bt_self_component_source_as_self_component(self_comp), + lttng_live); end: return ret; -error: - (void) bt_self_component_set_user_data(private_component, NULL); - lttng_live_component_destroy_data(lttng_live); - return ret; -} - -BT_HIDDEN -bt_component_status lttng_live_accept_port_connection( - bt_self_component *private_component, - struct bt_private_port *self_private_port, - const bt_port *other_port) -{ - struct lttng_live_component *lttng_live = - bt_self_component_get_user_data(private_component); - bt_component *other_component; - bt_component_status status = BT_COMPONENT_STATUS_OK; - const bt_port *self_port = bt_port_from_private(self_private_port); - - other_component = bt_port_get_component(other_port); - bt_component_put_ref(other_component); /* weak */ - - if (!lttng_live->downstream_component) { - lttng_live->downstream_component = other_component; - goto end; - } - - /* - * Compare prior component to ensure we are connected to the - * same downstream component as prior ports. - */ - if (lttng_live->downstream_component != other_component) { - BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".", - bt_port_get_name(self_port), - bt_component_get_name(other_component), - bt_component_get_name(lttng_live->downstream_component)); - status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION; - goto end; - } -end: - bt_port_put_ref(self_port); - return status; }