X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=4586d9ccc6732a6d22a4f78a4bc807d100e69d36;hb=0f1979c3efe2c5b253edb5944d2ba561347ece2e;hp=589472dcaa5db2a3faa57033118551dd36909e83;hpb=3f7d4d90b0456de9d34fac337350818ef06163bd;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 589472dc..4586d9cc 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -28,8 +28,10 @@ * SOFTWARE. */ -#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC" -#include "logging.h" +#define BT_COMP_LOG_SELF_COMP self_comp +#define BT_LOG_OUTPUT_LEVEL log_level +#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE" +#include "logging/comp-logging.h" #include #include @@ -40,18 +42,22 @@ #include "compat/compiler.h" #include +#include "plugins/common/muxing/muxing.h" +#include "plugins/common/param-validation/param-validation.h" + #include "data-stream.h" #include "metadata.h" #include "lttng-live.h" #define MAX_QUERY_SIZE (256*1024) #define URL_PARAM "url" +#define INPUTS_PARAM "inputs" #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action" #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue" #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail" #define SESS_NOT_FOUND_ACTION_END_STR "end" -#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__) +#define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ## __VA_ARGS__) static const char *print_live_iterator_status(enum lttng_live_iterator_status status) @@ -99,7 +105,7 @@ const char *print_state(struct lttng_live_stream_iterator *s) #define print_stream_state(live_stream_iter) \ do { \ - BT_LOGD("stream state %s last_inact_ts %" PRId64 \ + BT_COMP_LOGD("stream state %s last_inact_ts %" PRId64 \ ", curr_inact_ts %" PRId64, \ print_state(live_stream_iter), \ live_stream_iter->last_inactivity_ts, \ @@ -107,21 +113,17 @@ const char *print_state(struct lttng_live_stream_iterator *s) } while (0); BT_HIDDEN -bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live) +bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter) { - const bt_component *component; bool ret; - if (!lttng_live) { + if (!msg_iter) { ret = false; goto end; } - component = bt_component_source_as_component_const( - bt_self_component_source_as_component_source( - lttng_live->self_comp)); - - ret = bt_component_graph_is_canceled(component); + ret = bt_self_message_iterator_is_interrupted( + msg_iter->self_msg_iter); end: return ret; @@ -150,7 +152,10 @@ end: static void lttng_live_destroy_trace(struct lttng_live_trace *trace) { - BT_LOGD("Destroy lttng_live_trace"); + bt_logging_level log_level = trace->log_level; + bt_self_component *self_comp = trace->self_comp; + + BT_COMP_LOGD("Destroy lttng_live_trace"); BT_ASSERT(trace->stream_iterators); g_ptr_array_free(trace->stream_iterators, TRUE); @@ -167,22 +172,26 @@ struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *sess uint64_t trace_id) { struct lttng_live_trace *trace = NULL; + bt_logging_level log_level = session->log_level; + bt_self_component *self_comp = session->self_comp; trace = g_new0(struct lttng_live_trace, 1); if (!trace) { goto error; } + trace->log_level = session->log_level; + trace->self_comp = session->self_comp; trace->session = session; trace->id = trace_id; trace->trace_class = NULL; trace->trace = NULL; trace->stream_iterators = g_ptr_array_new_with_free_func( - (GDestroyNotify) lttng_live_stream_iterator_destroy); + (GDestroyNotify) lttng_live_stream_iterator_destroy); BT_ASSERT(trace->stream_iterators); trace->new_metadata_needed = true; g_ptr_array_add(session->traces, trace); - BT_LOGI("Create trace"); + BT_COMP_LOGI("Create trace"); goto end; error: g_free(trace); @@ -216,15 +225,19 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, { int ret = 0; struct lttng_live_session *session; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; session = g_new0(struct lttng_live_session, 1); if (!session) { goto error; } + session->log_level = lttng_live_msg_iter->log_level; + session->self_comp = lttng_live_msg_iter->self_comp; session->id = session_id; session->traces = g_ptr_array_new_with_free_func( - (GDestroyNotify) lttng_live_destroy_trace); + (GDestroyNotify) lttng_live_destroy_trace); BT_ASSERT(session->traces); session->lttng_live_msg_iter = lttng_live_msg_iter; session->new_streams_needed = true; @@ -234,12 +247,12 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, session->session_name = g_string_new(session_name); BT_ASSERT(session->session_name); - BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s", + BT_COMP_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s", session->id, hostname, session_name); g_ptr_array_add(lttng_live_msg_iter->sessions, session); goto end; error: - BT_LOGE("Error adding session"); + BT_COMP_LOGE("Error adding session"); g_free(session); ret = -1; end: @@ -249,20 +262,23 @@ end: static void lttng_live_destroy_session(struct lttng_live_session *session) { - struct lttng_live_component *live_comp; + bt_logging_level log_level; + bt_self_component *self_comp; if (!session) { goto end; } - BT_LOGD("Destroy lttng live session"); + log_level = session->log_level; + self_comp = session->self_comp; + BT_COMP_LOGD("Destroy lttng live session"); if (session->id != -1ULL) { if (lttng_live_detach_session(session)) { - live_comp = session->lttng_live_msg_iter->lttng_live_comp; if (session->lttng_live_msg_iter && - !lttng_live_graph_is_canceled(live_comp)) { + !lttng_live_graph_is_canceled( + session->lttng_live_msg_iter)) { /* Old relayd cannot detach sessions. */ - BT_LOGD("Unable to detach lttng live session %" PRIu64, + BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id); } } @@ -326,17 +342,20 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( struct lttng_live_stream_iterator *lttng_live_stream) { + bt_logging_level log_level = lttng_live_stream->log_level; + bt_self_component *self_comp = lttng_live_stream->self_comp; + switch (lttng_live_stream->state) { case LTTNG_LIVE_STREAM_QUIESCENT: case LTTNG_LIVE_STREAM_ACTIVE_DATA: break; case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: /* Invalid state. */ - BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\""); + BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\""); abort(); case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: /* Invalid state. */ - BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\""); + BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\""); abort(); case LTTNG_LIVE_STREAM_EOF: break; @@ -357,10 +376,11 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stre struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *lttng_live_stream) { - enum lttng_live_iterator_status ret = - LTTNG_LIVE_ITERATOR_STATUS_OK; - struct packet_index index; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; enum lttng_live_stream_state orig_state = lttng_live_stream->state; + struct packet_index index; if (lttng_live_stream->trace->new_metadata_needed) { ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; @@ -423,7 +443,7 @@ enum lttng_live_iterator_status lttng_live_get_session( ret = lttng_live_attach_session(session); if (ret) { if (lttng_live_msg_iter && lttng_live_graph_is_canceled( - lttng_live_msg_iter->lttng_live_comp)) { + lttng_live_msg_iter)) { status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; @@ -490,8 +510,7 @@ enum lttng_live_iterator_status lttng_live_iterator_handle_new_streams_and_metadata( struct lttng_live_msg_iter *lttng_live_msg_iter) { - enum lttng_live_iterator_status ret = - LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; uint64_t session_idx = 0, nr_sessions_opened = 0; struct lttng_live_session *session; enum session_not_found_action sess_not_found_act = @@ -552,16 +571,14 @@ enum lttng_live_iterator_status emit_inactivity_message( struct lttng_live_stream_iterator *stream_iter, bt_message **message, uint64_t timestamp) { - enum lttng_live_iterator_status ret = - LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; bt_message *msg = NULL; BT_ASSERT(stream_iter->trace->clock_class); msg = bt_message_message_iterator_inactivity_create( - lttng_live_msg_iter->self_msg_iter, - stream_iter->trace->clock_class, - timestamp); + lttng_live_msg_iter->self_msg_iter, + stream_iter->trace->clock_class, timestamp); if (!msg) { goto error; } @@ -582,8 +599,7 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_st struct lttng_live_stream_iterator *lttng_live_stream, bt_message **message) { - enum lttng_live_iterator_status ret = - LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) { return LTTNG_LIVE_ITERATOR_STATUS_OK; @@ -597,10 +613,10 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_st } ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, - message, lttng_live_stream->current_inactivity_ts); + message, lttng_live_stream->current_inactivity_ts); lttng_live_stream->last_inactivity_ts = - lttng_live_stream->current_inactivity_ts; + lttng_live_stream->current_inactivity_ts; end: return ret; } @@ -614,19 +630,19 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, const bt_clock_class *clock_class = NULL; const bt_clock_snapshot *clock_snapshot = NULL; int ret = 0; - bt_message_stream_activity_clock_snapshot_state sa_cs_state; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; BT_ASSERT(msg); BT_ASSERT(ts_ns); - BT_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, " + BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, " "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg, last_msg_ts_ns); switch (bt_message_get_type(msg)) { case BT_MESSAGE_TYPE_EVENT: - clock_class = - bt_message_event_borrow_stream_class_default_clock_class_const( + clock_class = bt_message_event_borrow_stream_class_default_clock_class_const( msg); BT_ASSERT(clock_class); @@ -634,8 +650,7 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, msg); break; case BT_MESSAGE_TYPE_PACKET_BEGINNING: - clock_class = - bt_message_packet_beginning_borrow_stream_class_default_clock_class_const( + clock_class = bt_message_packet_beginning_borrow_stream_class_default_clock_class_const( msg); BT_ASSERT(clock_class); @@ -643,8 +658,7 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, msg); break; case BT_MESSAGE_TYPE_PACKET_END: - clock_class = - bt_message_packet_end_borrow_stream_class_default_clock_class_const( + clock_class = bt_message_packet_end_borrow_stream_class_default_clock_class_const( msg); BT_ASSERT(clock_class); @@ -652,8 +666,7 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, msg); break; case BT_MESSAGE_TYPE_DISCARDED_EVENTS: - clock_class = - bt_message_discarded_events_borrow_stream_class_default_clock_class_const( + clock_class = bt_message_discarded_events_borrow_stream_class_default_clock_class_const( msg); BT_ASSERT(clock_class); @@ -661,48 +674,20 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, msg); break; case BT_MESSAGE_TYPE_DISCARDED_PACKETS: - clock_class = - bt_message_discarded_packets_borrow_stream_class_default_clock_class_const( + clock_class = bt_message_discarded_packets_borrow_stream_class_default_clock_class_const( msg); BT_ASSERT(clock_class); clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const( msg); break; - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - clock_class = - bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( - msg); - BT_ASSERT(clock_class); - - sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const( - msg, &clock_snapshot); - if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { - goto no_clock_snapshot; - } - - break; - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: - clock_class = - bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( - msg); - BT_ASSERT(clock_class); - - sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const( - msg, &clock_snapshot); - if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { - goto no_clock_snapshot; - } - - break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: - clock_snapshot = - bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( - msg); + clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + msg); break; default: /* All the other messages have a higher priority */ - BT_LOGD_STR("Message has no timestamp: using the last message timestamp."); + BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp."); *ts_ns = last_msg_ts_ns; goto end; } @@ -712,25 +697,19 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); if (ret) { - BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " + BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " "clock-snapshot-addr=%p", clock_snapshot); goto error; } goto end; -no_clock_snapshot: - BT_LOGD_STR("Message's default clock snapshot is missing: " - "using the last message timestamp."); - *ts_ns = last_msg_ts_ns; - goto end; - error: ret = -1; end: if (ret == 0) { - BT_LOGD("Found message's timestamp: " + BT_COMP_LOGD("Found message's timestamp: " "iter-data-addr=%p, msg-addr=%p, " "last-msg-ts=%" PRId64 ", ts=%" PRId64, lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns); @@ -746,6 +725,8 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ bt_message **message) { enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum bt_msg_iter_status status; uint64_t session_idx, trace_idx; @@ -796,7 +777,7 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ case BT_MSG_ITER_STATUS_ERROR: default: ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p", + BT_COMP_LOGW("CTF msg iterator return an error or failed msg_iter=%p", lttng_live_stream->msg_iter); break; } @@ -805,6 +786,34 @@ end: return ret; } +static +enum lttng_live_iterator_status lttng_live_iterator_close_stream( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + bt_message **curr_msg) +{ + enum lttng_live_iterator_status live_status = + LTTNG_LIVE_ITERATOR_STATUS_OK; + /* + * The viewer has hung up on us so we are closing the stream. The + * `bt_msg_iter` should simply realize that it needs to close the + * stream properly by emitting the necessary stream end message. + */ + enum bt_msg_iter_status status = bt_msg_iter_get_next_message( + stream_iter->msg_iter, lttng_live_msg_iter->self_msg_iter, + curr_msg); + + if (status == BT_MSG_ITER_STATUS_ERROR) { + live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + BT_ASSERT(status == BT_MSG_ITER_STATUS_OK); + +end: + return live_status; +} + /* * helper function: * handle_no_data_streams() @@ -854,38 +863,60 @@ end: * When disconnected from relayd: try to re-connect endlessly. */ static -enum lttng_live_iterator_status lttng_live_iterator_next_on_stream( +enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, bt_message **curr_msg) { + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status live_status; + if (stream_iter->has_stream_hung_up) { + /* + * The stream has hung up and the stream was properly closed + * during the last call to the current function. Return _END + * status now so that this stream iterator is removed for the + * stream iterator list. + */ + live_status = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; + } + retry: print_stream_state(stream_iter); live_status = lttng_live_iterator_handle_new_streams_and_metadata( - lttng_live_msg_iter); + lttng_live_msg_iter); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } live_status = lttng_live_iterator_next_handle_one_no_data_stream( - lttng_live_msg_iter, stream_iter); + lttng_live_msg_iter, stream_iter); + if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) { + /* + * We overwrite `live_status` since `curr_msg` is + * likely set to a valid message in this function. + */ + live_status = lttng_live_iterator_close_stream( + lttng_live_msg_iter, stream_iter, curr_msg); + } goto end; } live_status = lttng_live_iterator_next_handle_one_quiescent_stream( - lttng_live_msg_iter, stream_iter, curr_msg); + lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(*curr_msg == NULL); + BT_ASSERT(!*curr_msg); goto end; } if (*curr_msg) { goto end; } live_status = lttng_live_iterator_next_handle_one_active_data_stream( - lttng_live_msg_iter, stream_iter, curr_msg); + lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - BT_ASSERT(*curr_msg == NULL); + BT_ASSERT(!*curr_msg); } end: @@ -900,11 +931,13 @@ static enum lttng_live_iterator_status next_stream_iterator_for_trace( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_trace *live_trace, - struct lttng_live_stream_iterator **candidate_stream_iter) + struct lttng_live_stream_iterator **youngest_trace_stream_iter) { - struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL; + struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status stream_iter_status;; - int64_t curr_candidate_msg_ts = INT64_MAX; + int64_t youngest_candidate_msg_ts = INT64_MAX; uint64_t stream_iter_idx; BT_ASSERT(live_trace); @@ -920,7 +953,17 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( bool stream_iter_is_ended = false; struct lttng_live_stream_iterator *stream_iter = g_ptr_array_index(live_trace->stream_iterators, - stream_iter_idx); + stream_iter_idx); + + /* + * Since we may remove elements from the GPtrArray as we + * iterate over it, it's possible to see the same element more + * than once. + */ + if (stream_iter == youngest_candidate_stream_iter) { + stream_iter_idx++; + continue; + } /* * Find if there is are now current message for this stream @@ -929,11 +972,11 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( while (!stream_iter->current_msg) { bt_message *msg = NULL; int64_t curr_msg_ts_ns = INT64_MAX; - stream_iter_status = lttng_live_iterator_next_on_stream( - lttng_live_msg_iter, stream_iter, &msg); + stream_iter_status = lttng_live_iterator_next_msg_on_stream( + lttng_live_msg_iter, stream_iter, &msg); - BT_LOGD("live stream iterator returned status :%s", - print_live_iterator_status(stream_iter_status)); + BT_COMP_LOGD("live stream iterator returned status :%s", + print_live_iterator_status(stream_iter_status)); if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { stream_iter_is_ended = true; break; @@ -967,7 +1010,7 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( * We received a message in the past. To ensure * monotonicity, we can't send it forward. */ - BT_LOGE("Message's timestamp is less than " + BT_COMP_LOGE("Message's timestamp is less than " "lttng-live's message iterator's last " "returned timestamp: " "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " @@ -979,18 +1022,50 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( } } - if (!stream_iter_is_ended && - stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { - /* - * Update the current best candidate message for the - * stream iterator of thise live trace to be forwarded - * downstream. - */ - curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; - curr_candidate_stream_iter = stream_iter; - } + BT_ASSERT(stream_iter != youngest_candidate_stream_iter); + + if (!stream_iter_is_ended) { + if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) { + /* + * Update the current best candidate message + * for the stream iterator of this live trace + * to be forwarded downstream. + */ + youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns; + youngest_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + BT_ASSERT(stream_iter != youngest_candidate_stream_iter); + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + youngest_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `youngest_candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns; + youngest_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* + * Unable to pick which one should go + * first. + */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" + "stream-iter-addr=%p", + stream_iter, + youngest_candidate_stream_iter); + } + } - if (stream_iter_is_ended) { + stream_iter_idx++; + } else { /* * The live stream iterator is ENDed. We remove that * iterator from the list and we restart the iteration @@ -1000,13 +1075,11 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx); stream_iter_idx = 0; - } else { - stream_iter_idx++; } } - if (curr_candidate_stream_iter) { - *candidate_stream_iter = curr_candidate_stream_iter; + if (youngest_candidate_stream_iter) { + *youngest_trace_stream_iter = youngest_candidate_stream_iter; stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } else { /* @@ -1025,12 +1098,14 @@ static enum lttng_live_iterator_status next_stream_iterator_for_session( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_session *session, - struct lttng_live_stream_iterator **candidate_session_stream_iter) + struct lttng_live_stream_iterator **youngest_session_stream_iter) { + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t trace_idx = 0; - int64_t curr_candidate_msg_ts = INT64_MAX; - struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL; + int64_t youngest_candidate_msg_ts = INT64_MAX; + struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL; /* * Make sure we are attached to the session and look for new streams @@ -1071,9 +1146,32 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( if (!trace_is_ended) { BT_ASSERT(stream_iter); - if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) { - curr_candidate_msg_ts = stream_iter->current_msg_ts_ns; - curr_candidate_stream_iter = stream_iter; + if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) || + stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) { + youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns; + youngest_candidate_stream_iter = stream_iter; + } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) { + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + stream_iter->current_msg, + youngest_candidate_stream_iter->current_msg); + if (ret < 0) { + /* + * The `youngest_candidate_stream_iter->current_msg` + * should go first. Update the next iterator + * and the current timestamp. + */ + youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns; + youngest_candidate_stream_iter = stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "stream-iter-addr=%p" "stream-iter-addr=%p", + stream_iter, youngest_candidate_stream_iter); + } } trace_idx++; } else { @@ -1081,8 +1179,8 @@ enum lttng_live_iterator_status next_stream_iterator_for_session( trace_idx = 0; } } - if (curr_candidate_stream_iter) { - *candidate_session_stream_iter = curr_candidate_stream_iter; + if (youngest_candidate_stream_iter) { + *youngest_session_stream_iter = youngest_candidate_stream_iter; stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } else { /* @@ -1112,16 +1210,18 @@ void put_messages(bt_message_array_const msgs, uint64_t count) } BT_HIDDEN -bt_self_message_iterator_status lttng_live_msg_iter_next( +bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( bt_self_message_iterator *self_msg_it, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_self_message_iterator_status status; + bt_component_class_message_iterator_next_method_status status; struct lttng_live_msg_iter *lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_it); struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; enum lttng_live_iterator_status stream_iter_status; uint64_t session_idx; @@ -1143,7 +1243,7 @@ bt_self_message_iterator_status lttng_live_msg_iter_next( if (lttng_live_msg_iter->sessions->len == 0) { if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - status = BT_SELF_MESSAGE_ITERATOR_STATUS_END; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; goto no_session; } else { /* @@ -1152,7 +1252,7 @@ bt_self_message_iterator_status lttng_live_msg_iter_next( * requested session name. */ if (lttng_live_create_viewer_session(lttng_live_msg_iter)) { - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; goto no_session; } } @@ -1182,9 +1282,9 @@ bt_self_message_iterator_status lttng_live_msg_iter_next( * return it. */ while (*count < capacity) { - struct lttng_live_stream_iterator *next_stream_iter = NULL, - *candidate_stream_iter = NULL; - int64_t next_msg_ts_ns = INT64_MAX; + struct lttng_live_stream_iterator *youngest_stream_iter = NULL, + *candidate_stream_iter = NULL; + int64_t youngest_msg_ts_ns = INT64_MAX; BT_ASSERT(lttng_live_msg_iter->sessions); session_idx = 0; @@ -1233,35 +1333,65 @@ bt_self_message_iterator_status lttng_live_msg_iter_next( goto end; } - if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) { - next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; - next_stream_iter = candidate_stream_iter; + if (G_UNLIKELY(youngest_stream_iter == NULL) || + candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) { + youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; + youngest_stream_iter = candidate_stream_iter; + } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) { + /* + * The currently selected message to be sent + * downstream next has the exact same timestamp + * that of the current candidate message. We + * must break the tie in a predictable manner. + */ + BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically."); + /* + * Order the messages in an arbitrary but + * deterministic way. + */ + int ret = common_muxing_compare_messages( + candidate_stream_iter->current_msg, + youngest_stream_iter->current_msg); + if (ret < 0) { + /* + * The `candidate_stream_iter->current_msg` + * should go first. Update the next + * iterator and the current timestamp. + */ + youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns; + youngest_stream_iter = candidate_stream_iter; + } else if (ret == 0) { + /* Unable to pick which one should go first. */ + BT_COMP_LOGW("Cannot deterministically pick next live stream message iterator because they have identical next messages: " + "next-stream-iter-addr=%p" "candidate-stream-iter-addr=%p", + youngest_stream_iter, candidate_stream_iter); + } } session_idx++; } - if (!next_stream_iter) { + if (!youngest_stream_iter) { stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; goto end; } - BT_ASSERT(next_stream_iter->current_msg); + BT_ASSERT(youngest_stream_iter->current_msg); /* Ensure monotonicity. */ BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <= - next_stream_iter->current_msg_ts_ns); + youngest_stream_iter->current_msg_ts_ns); /* * Insert the next message to the message batch. This will set * stream iterator current messsage to NULL so that next time * we fetch the next message of that stream iterator */ - BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg); + BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg); (*count)++; /* Update the last timestamp in nanoseconds sent downstream. */ - lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns; - next_stream_iter->current_msg_ts_ns = INT64_MAX; + lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns; + youngest_stream_iter->current_msg_ts_ns = INT64_MAX; stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; } @@ -1276,21 +1406,21 @@ end: * now. On the next call we return again if there are * still no new message to send. */ - status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; } else { - status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; } break; case LTTNG_LIVE_ITERATOR_STATUS_END: - status = BT_SELF_MESSAGE_ITERATOR_STATUS_END; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; break; case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: - status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR; break; case LTTNG_LIVE_ITERATOR_STATUS_ERROR: case LTTNG_LIVE_ITERATOR_STATUS_INVAL: case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: - status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; /* Put all existing messages on error. */ put_messages(msgs, *count); break; @@ -1303,21 +1433,25 @@ no_session: } BT_HIDDEN -bt_self_message_iterator_status lttng_live_msg_iter_init( +bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter_init( bt_self_message_iterator *self_msg_it, + bt_self_message_iterator_configuration *config, bt_self_component_source *self_comp_src, bt_self_component_port_output *self_port) { - bt_self_message_iterator_status ret = - BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + bt_component_class_message_iterator_initialize_method_status ret = + BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; bt_self_component *self_comp = - bt_self_component_source_as_self_component(self_comp_src); + bt_self_component_source_as_self_component(self_comp_src); struct lttng_live_component *lttng_live; struct lttng_live_msg_iter *lttng_live_msg_iter; + bt_logging_level log_level; BT_ASSERT(self_msg_it); lttng_live = bt_self_component_get_data(self_comp); + log_level = lttng_live->log_level; + self_comp = lttng_live->self_comp; /* There can be only one downstream iterator at the same time. */ BT_ASSERT(!lttng_live->has_msg_iter); @@ -1325,10 +1459,12 @@ bt_self_message_iterator_status lttng_live_msg_iter_init( lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1); if (!lttng_live_msg_iter) { - ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; goto end; } + lttng_live_msg_iter->log_level = lttng_live->log_level; + lttng_live_msg_iter->self_comp = lttng_live->self_comp; lttng_live_msg_iter->lttng_live_comp = lttng_live; lttng_live_msg_iter->self_msg_iter = self_msg_it; @@ -1340,7 +1476,7 @@ bt_self_message_iterator_status lttng_live_msg_iter_init( lttng_live_msg_iter->viewer_connection = live_viewer_connection_create(lttng_live->params.url->str, false, - lttng_live_msg_iter); + lttng_live_msg_iter, log_level); if (!lttng_live_msg_iter->viewer_connection) { goto error; } @@ -1351,7 +1487,7 @@ bt_self_message_iterator_status lttng_live_msg_iter_init( if (lttng_live_msg_iter->sessions->len == 0) { switch (lttng_live->params.sess_not_found_act) { case SESSION_NOT_FOUND_ACTION_CONTINUE: - BT_LOGI("Unable to connect to the requested live viewer " + BT_COMP_LOGI("Unable to connect to the requested live viewer " "session. Keep trying to connect because of " "%s=\"%s\" component parameter: url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, @@ -1359,7 +1495,7 @@ bt_self_message_iterator_status lttng_live_msg_iter_init( lttng_live->params.url->str); break; case SESSION_NOT_FOUND_ACTION_FAIL: - BT_LOGE("Unable to connect to the requested live viewer " + BT_COMP_LOGE("Unable to connect to the requested live viewer " "session. Fail the message iterator" "initialization because of %s=\"%s\" " "component parameter: url =\"%s\"", @@ -1368,13 +1504,15 @@ bt_self_message_iterator_status lttng_live_msg_iter_init( lttng_live->params.url->str); goto error; case SESSION_NOT_FOUND_ACTION_END: - BT_LOGI("Unable to connect to the requested live viewer " + BT_COMP_LOGI("Unable to connect to the requested live viewer " "session. End gracefully at the first _next() " "call because of %s=\"%s\" component parameter: " "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR, lttng_live->params.url->str); break; + default: + abort(); } } @@ -1382,45 +1520,54 @@ bt_self_message_iterator_status lttng_live_msg_iter_init( goto end; error: - ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; lttng_live_msg_iter_destroy(lttng_live_msg_iter); end: return ret; } +static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = { + { URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { .type = BT_VALUE_TYPE_STRING } }, + BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END +}; + static -bt_query_status lttng_live_query_list_sessions(const bt_value *params, - const bt_value **result) +bt_component_class_query_method_status lttng_live_query_list_sessions( + const bt_value *params, const bt_value **result, + bt_self_component_class *self_comp_class, + bt_logging_level log_level) { - bt_query_status status = BT_QUERY_STATUS_OK; + bt_component_class_query_method_status status; const bt_value *url_value = NULL; const char *url; struct live_viewer_connection *viewer_connection = NULL; + enum bt_param_validation_status validation_status; + gchar *validate_error = NULL; - url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); - if (!url_value) { - BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM); - status = BT_QUERY_STATUS_INVALID_PARAMS; + validation_status = bt_param_validation_validate(params, + list_sessions_params, &validate_error); + if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; goto error; - } - - if (!bt_value_is_string(url_value)) { - BT_LOGW("`%s` parameter is required to be a string value", - URL_PARAM); - status = BT_QUERY_STATUS_INVALID_PARAMS; + } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error); goto error; } + url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); url = bt_value_string_get(url_value); - viewer_connection = live_viewer_connection_create(url, true, NULL); + viewer_connection = live_viewer_connection_create(url, true, NULL, + log_level); if (!viewer_connection) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; } status = live_viewer_connection_list_sessions(viewer_connection, - result); - if (status != BT_QUERY_STATUS_OK) { + result); + if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) { goto error; } @@ -1430,29 +1577,120 @@ error: BT_VALUE_PUT_REF_AND_RESET(*result); if (status >= 0) { - status = BT_QUERY_STATUS_ERROR; + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; } end: if (viewer_connection) { live_viewer_connection_destroy(viewer_connection); } + + g_free(validate_error); + + return status; +} + +static +bt_component_class_query_method_status lttng_live_query_support_info( + const bt_value *params, const bt_value **result, + bt_logging_level log_level) +{ + bt_component_class_query_method_status status = + BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; + const bt_value *input_type_value; + const bt_value *input_value; + double weight = 0; + struct bt_common_lttng_live_url_parts parts = { 0 }; + + /* Used by the logging macros */ + __attribute__((unused)) bt_self_component *self_comp = NULL; + + *result = NULL; + input_type_value = bt_value_map_borrow_entry_value_const(params, + "type"); + if (!input_type_value) { + BT_COMP_LOGE("Missing expected `type` parameter."); + goto error; + } + + if (!bt_value_is_string(input_type_value)) { + BT_COMP_LOGE("`type` parameter is not a string value."); + goto error; + } + + if (strcmp(bt_value_string_get(input_type_value), "string") != 0) { + /* We don't handle file system paths */ + goto create_result; + } + + input_value = bt_value_map_borrow_entry_value_const(params, "input"); + if (!input_value) { + BT_COMP_LOGE("Missing expected `input` parameter."); + goto error; + } + + if (!bt_value_is_string(input_value)) { + BT_COMP_LOGE("`input` parameter is not a string value."); + goto error; + } + + parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), + NULL, 0); + if (parts.session_name) { + /* + * Looks pretty much like an LTTng live URL: we got the + * session name part, which forms a complete URL. + */ + weight = .75; + } + +create_result: + *result = bt_value_real_create_init(weight); + if (!*result) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; + goto error; + } + + goto end; + +error: + if (status >= 0) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + } + + BT_ASSERT(!*result); + +end: + bt_common_destroy_lttng_live_url_parts(&parts); return status; } BT_HIDDEN -bt_query_status lttng_live_query(bt_self_component_class_source *comp_class, - const bt_query_executor *query_exec, +bt_component_class_query_method_status lttng_live_query( + bt_self_component_class_source *comp_class, + bt_private_query_executor *priv_query_exec, const char *object, const bt_value *params, + __attribute__((unused)) void *method_data, const bt_value **result) { - bt_query_status status = BT_QUERY_STATUS_OK; + bt_component_class_query_method_status status = + BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; + bt_self_component *self_comp = NULL; + bt_self_component_class *self_comp_class = + bt_self_component_class_source_as_self_component_class(comp_class); + bt_logging_level log_level = bt_query_executor_get_logging_level( + bt_private_query_executor_as_query_executor_const( + priv_query_exec)); if (strcmp(object, "sessions") == 0) { - status = lttng_live_query_list_sessions(params, result); + status = lttng_live_query_list_sessions(params, result, + self_comp_class, log_level); + } else if (strcmp(object, "babeltrace.support-info") == 0) { + status = lttng_live_query_support_info(params, result, + log_level); } else { - BT_LOGW("Unknown query object `%s`", object); - status = BT_QUERY_STATUS_INVALID_OBJECT; + BT_COMP_LOGI("Unknown query object `%s`", object); + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT; goto end; } @@ -1476,7 +1714,7 @@ BT_HIDDEN void lttng_live_component_finalize(bt_self_component_source *component) { void *data = bt_self_component_get_data( - bt_self_component_source_as_self_component(component)); + bt_self_component_source_as_self_component(component)); if (!data) { return; @@ -1489,107 +1727,146 @@ enum session_not_found_action parse_session_not_found_action_param( const bt_value *no_session_param) { enum session_not_found_action action; - const char *no_session_act_str; - no_session_act_str = bt_value_string_get(no_session_param); + const char *no_session_act_str = bt_value_string_get(no_session_param); + if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) { action = SESSION_NOT_FOUND_ACTION_CONTINUE; } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) { action = SESSION_NOT_FOUND_ACTION_FAIL; - } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) { - action = SESSION_NOT_FOUND_ACTION_END; } else { - action = -1; + BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0); + action = SESSION_NOT_FOUND_ACTION_END; } return action; } -struct lttng_live_component *lttng_live_component_create(const bt_value *params) +static struct bt_param_validation_value_descr inputs_elem_descr = { + .type = BT_VALUE_TYPE_STRING, +}; + +static const char *sess_not_found_action_choices[] = { + SESS_NOT_FOUND_ACTION_CONTINUE_STR, + SESS_NOT_FOUND_ACTION_FAIL_STR, + SESS_NOT_FOUND_ACTION_END_STR, +}; + +static struct bt_param_validation_map_value_entry_descr params_descr[] = { + { INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY, { BT_VALUE_TYPE_ARRAY, .array = { + .min_length = 1, + .max_length = 1, + .element_type = &inputs_elem_descr, + } } }, + { SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { BT_VALUE_TYPE_STRING, .string = { + .choices = sess_not_found_action_choices, + } } }, + BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END +}; + +static +bt_component_class_initialize_method_status lttng_live_component_create( + const bt_value *params, + bt_logging_level log_level, + bt_self_component *self_comp, + struct lttng_live_component **component) { - struct lttng_live_component *lttng_live; - const bt_value *value = NULL; + struct lttng_live_component *lttng_live = NULL; + const bt_value *inputs_value; + const bt_value *url_value; + const bt_value *value; const char *url; + enum bt_param_validation_status validation_status; + gchar *validation_error = NULL; + bt_component_class_initialize_method_status status; + + validation_status = bt_param_validation_validate(params, params_descr, + &validation_error); + if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + goto error; + } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error); + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; + goto error; + } lttng_live = g_new0(struct lttng_live_component, 1); if (!lttng_live) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto end; } + lttng_live->log_level = log_level; + lttng_live->self_comp = self_comp; lttng_live->max_query_size = MAX_QUERY_SIZE; lttng_live->has_msg_iter = false; - value = bt_value_map_borrow_entry_value_const(params, URL_PARAM); - if (!value || !bt_value_is_string(value)) { - BT_LOGW("Mandatory `%s` parameter missing or not a string", - URL_PARAM); - goto error; - } - url = bt_value_string_get(value); + inputs_value = + bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM); + url_value = + bt_value_array_borrow_element_by_index_const(inputs_value, 0); + url = bt_value_string_get(url_value); + lttng_live->params.url = g_string_new(url); if (!lttng_live->params.url) { + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; goto error; } value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM); - - if (value && bt_value_is_string(value)) { + if (value) { lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value); - if (lttng_live->params.sess_not_found_act == -1) { - BT_LOGE("Unexpected value for `%s` parameter: " - "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM, - bt_value_string_get(value)); - goto error; - } } else { - BT_LOGW("Optional `%s` parameter is missing or " - "not a string value. Defaulting to %s=\"%s\".", - SESS_NOT_FOUND_ACTION_PARAM, + BT_COMP_LOGI("Optional `%s` parameter is missing: " + "defaulting to `%s`.", SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR); lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE; } + status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; error: lttng_live_component_destroy_data(lttng_live); lttng_live = NULL; end: - return lttng_live; + g_free(validation_error); + + *component = lttng_live; + return status; } BT_HIDDEN -bt_self_component_status lttng_live_component_init( - bt_self_component_source *self_comp, - const bt_value *params, __attribute__((unused)) void *init_method_data) +bt_component_class_initialize_method_status lttng_live_component_init( + bt_self_component_source *self_comp_src, + bt_self_component_source_configuration *config, + const bt_value *params, + __attribute__((unused)) void *init_method_data) { struct lttng_live_component *lttng_live; - bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK; - - lttng_live = lttng_live_component_create(params); - if (!lttng_live) { - ret = BT_SELF_COMPONENT_STATUS_NOMEM; - goto error; - } - lttng_live->self_comp = self_comp; + bt_component_class_initialize_method_status ret; + bt_self_component *self_comp = + bt_self_component_source_as_self_component(self_comp_src); + bt_logging_level log_level = bt_component_get_logging_level( + bt_self_component_as_component(self_comp)); + bt_self_component_add_port_status add_port_status; - if (lttng_live_graph_is_canceled(lttng_live)) { - ret = BT_SELF_COMPONENT_STATUS_END; + ret = lttng_live_component_create(params, log_level, self_comp, <tng_live); + if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) { goto error; } - ret = bt_self_component_source_add_output_port( - lttng_live->self_comp, "out", - NULL, NULL); - if (ret != BT_SELF_COMPONENT_STATUS_OK) { - goto error; + add_port_status = bt_self_component_source_add_output_port( + self_comp_src, "out", NULL, NULL); + if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) { + ret = (int) add_port_status; + goto end; } - bt_self_component_set_data( - bt_self_component_source_as_self_component(self_comp), - lttng_live); + bt_self_component_set_data(self_comp, lttng_live); goto end; error: