#include <unistd.h>
#include "common/assert.h"
-#include "cpp-common/bt2c/fmt.hpp"
+#include "cpp-common/bt2/wrap.hpp"
+#include "cpp-common/bt2c/fmt.hpp" /* IWYU pragma: keep */
#include "cpp-common/bt2c/glib-up.hpp"
#include "cpp-common/bt2c/vector.hpp"
#include "cpp-common/bt2s/make-unique.hpp"
bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
{
- bool ret;
-
if (!msg_iter) {
- ret = false;
- goto end;
+ return false;
}
- ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
-
-end:
- return ret;
+ return bt_self_message_iterator_is_interrupted(msg_iter->selfMsgIter.libObjPtr());
}
static struct lttng_live_trace *
lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
{
- uint64_t trace_idx;
- struct lttng_live_trace *ret_trace = NULL;
-
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ for (lttng_live_trace::UP& trace : session->traces) {
if (trace->id == trace_id) {
- ret_trace = trace;
- goto end;
+ return trace.get();
}
}
-end:
- return ret_trace;
-}
-
-static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
-{
- BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
-
- delete trace;
+ return nullptr;
}
static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
trace_id);
- lttng_live_trace *trace = new lttng_live_trace {session->logger};
+ auto trace = bt2s::make_unique<lttng_live_trace>(session->logger);
+
trace->session = session;
trace->id = trace_id;
trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
- g_ptr_array_add(session->traces, trace);
- return trace;
+ const auto ret = trace.get();
+ session->traces.emplace_back(std::move(trace));
+ return ret;
}
struct lttng_live_trace *
lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
uint64_t trace_id)
{
- struct lttng_live_trace *trace;
-
- trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
- if (trace) {
- goto end;
+ if (lttng_live_trace *trace = lttng_live_session_borrow_trace_by_id(session, trace_id)) {
+ return trace;
}
/* The session is the owner of the newly created trace. */
- trace = lttng_live_create_trace(session, trace_id);
-
-end:
- return trace;
+ return lttng_live_create_trace(session, trace_id);
}
int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
"session-id={}, hostname=\"{}\", session-name=\"{}\"",
session_id, hostname, session_name);
- lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
- session->self_comp = lttng_live_msg_iter->self_comp;
+ auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger,
+ lttng_live_msg_iter->selfComp);
+
session->id = session_id;
- session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
- BT_ASSERT(session->traces);
session->lttng_live_msg_iter = lttng_live_msg_iter;
session->new_streams_needed = true;
- session->hostname = g_string_new(hostname);
- BT_ASSERT(session->hostname);
-
- session->session_name = g_string_new(session_name);
- BT_ASSERT(session->session_name);
+ session->hostname = hostname;
+ session->session_name = session_name;
- g_ptr_array_add(lttng_live_msg_iter->sessions, session);
+ lttng_live_msg_iter->sessions.emplace_back(std::move(session));
return 0;
}
-static void lttng_live_destroy_session(struct lttng_live_session *session)
+lttng_live_session::~lttng_live_session()
{
- if (!session) {
- goto end;
- }
+ BT_CPPLOGD_SPEC(this->logger, "Destroying live session: session-id={}, session-name=\"{}\"",
+ this->id, this->session_name);
- BT_CPPLOGD_SPEC(session->logger,
- "Destroying live session: "
- "session-id={}, session-name=\"{}\"",
- session->id, session->session_name->str);
- if (session->id != -1ULL) {
- if (lttng_live_session_detach(session)) {
- if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
+ if (this->id != -1ULL) {
+ if (lttng_live_session_detach(this)) {
+ if (!lttng_live_graph_is_canceled(this->lttng_live_msg_iter)) {
/* Old relayd cannot detach sessions. */
- BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
- session->id);
+ BT_CPPLOGD_SPEC(this->logger, "Unable to detach lttng live session {}", this->id);
}
}
- session->id = -1ULL;
- }
-
- if (session->traces) {
- g_ptr_array_free(session->traces, TRUE);
- }
-
- if (session->hostname) {
- g_string_free(session->hostname, TRUE);
- }
- if (session->session_name) {
- g_string_free(session->session_name, TRUE);
+ this->id = -1ULL;
}
-
- delete session;
-
-end:
- return;
}
-static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
+lttng_live_msg_iter::~lttng_live_msg_iter()
{
- if (!lttng_live_msg_iter) {
- goto end;
- }
+ this->sessions.clear();
- if (lttng_live_msg_iter->sessions) {
- g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
- }
-
- if (lttng_live_msg_iter->viewer_connection) {
- live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
- }
- BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
- BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
+ BT_ASSERT(this->lttng_live_comp);
+ BT_ASSERT(this->lttng_live_comp->has_msg_iter);
/* All stream iterators must be destroyed at this point. */
- BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
- lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
-
- delete lttng_live_msg_iter;
-
-end:
- return;
+ BT_ASSERT(this->active_stream_iter == 0);
+ this->lttng_live_comp->has_msg_iter = false;
}
void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
{
- struct lttng_live_msg_iter *lttng_live_msg_iter;
-
- BT_ASSERT(self_msg_iter);
-
- lttng_live_msg_iter =
- (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
- BT_ASSERT(lttng_live_msg_iter);
- lttng_live_msg_iter_destroy(lttng_live_msg_iter);
+ lttng_live_msg_iter::UP {
+ static_cast<lttng_live_msg_iter *>(bt_self_message_iterator_get_data(self_msg_iter))};
}
static enum lttng_live_iterator_status
goto end;
}
- lttng_live_stream->base_offset = index.offset;
- lttng_live_stream->offset = index.offset;
- lttng_live_stream->len = index.packet_size / CHAR_BIT;
+ lttng_live_stream->curPktInfo.emplace(lttng_live_stream_iterator::CurPktInfo {
+ bt2c::DataLen::fromBytes(index.offset),
+ bt2c::DataLen::fromBits(index.packet_size),
+ });
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Setting live stream reading info: stream-name=\"{}\", "
- "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
+ "viewer-stream-id={}, stream-offset-in-relay={}, stream-len-bytes={}",
lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
- lttng_live_stream->base_offset, lttng_live_stream->offset,
- lttng_live_stream->len);
+ lttng_live_stream->curPktInfo->offsetInRelay.bytes(),
+ lttng_live_stream->curPktInfo->len.bytes());
end:
if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
struct lttng_live_session *session)
{
enum lttng_live_iterator_status status;
- uint64_t trace_idx;
if (!session->attached) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
session->id);
- enum lttng_live_viewer_status attach_status =
- lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
+ lttng_live_viewer_status attach_status = lttng_live_session_attach(session);
if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
/*
* cancelled.
*/
bt_current_thread_clear_error();
- status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error attaching to LTTng live session");
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- goto end;
}
}
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Updating all data streams: "
- "session-id={}, session-name=\"{}\"",
- session->id, session->session_name->str);
+ "Updating all data streams: session-id={}, session-name=\"{}\"", session->id,
+ session->session_name);
- status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
+ status = lttng_live_session_get_new_streams(session);
switch (status) {
case LTTNG_LIVE_ITERATOR_STATUS_OK:
break;
lttng_live_msg_iter->logger,
"Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
"session-id={}, session-name=\"{}\"",
- session->id, session->session_name->str);
- status = LTTNG_LIVE_ITERATOR_STATUS_OK;
- break;
+ session->id, session->session_name);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
default:
- goto end;
+ return status;
}
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Updating metadata stream for session: "
- "session-id={}, session-name=\"{}\"",
- session->id, session->session_name->str);
-
- trace_idx = 0;
- while (trace_idx < session->traces->len) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ "Updating metadata stream for session: session-id={}, session-name=\"{}\"",
+ session->id, session->session_name);
- status = lttng_live_metadata_update(trace);
+ for (lttng_live_trace::UP& trace : session->traces) {
+ status = lttng_live_metadata_update(trace.get());
switch (status) {
case LTTNG_LIVE_ITERATOR_STATUS_END:
+ break;
case LTTNG_LIVE_ITERATOR_STATUS_OK:
- trace_idx++;
break;
case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
- goto end;
+ return status;
default:
- BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
- "Error updating trace metadata: "
- "stream-iter-status={}, trace-id={}",
- status, trace->id);
- goto end;
+ BT_CPPLOGE_APPEND_CAUSE_SPEC(
+ lttng_live_msg_iter->logger,
+ "Error updating trace metadata: stream-iter-status={}, trace-id={}", status,
+ trace->id);
+ return status;
}
}
* Now that we have the metadata we can initialize the downstream
* iterator.
*/
- status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
-
-end:
- return status;
+ return lttng_live_lazy_msg_init(session, lttng_live_msg_iter->selfMsgIter);
}
static void
lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- uint64_t session_idx, trace_idx;
-
- for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
- struct lttng_live_session *session =
- (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
+ for (const auto& session : lttng_live_msg_iter->sessions) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Force marking session as needing new streams: "
"session-id={}",
session->id);
session->new_streams_needed = true;
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ for (lttng_live_trace::UP& trace : session->traces) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Force marking trace metadata state as needing an update: "
"session-id={}, trace-id={}",
static enum lttng_live_iterator_status
lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- enum lttng_live_iterator_status status;
enum lttng_live_viewer_status viewer_status;
- uint64_t session_idx = 0, nr_sessions_opened = 0;
- struct lttng_live_session *session;
+ uint64_t nr_sessions_opened = 0;
enum session_not_found_action sess_not_found_act =
lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
* need to query for new sessions even though we have sessions
* currently ongoing.
*/
- if (lttng_live_msg_iter->sessions->len == 0) {
+ if (lttng_live_msg_iter->sessions.empty()) {
if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
"No session found. Exiting in accordance with the `session-not-found-action` parameter");
- status = LTTNG_LIVE_ITERATOR_STATUS_END;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
} else {
BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
- status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error creating LTTng live viewer session");
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
- status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
bt_common_abort();
}
- goto end;
}
}
}
- for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
- session =
- (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
- status = lttng_live_get_session(lttng_live_msg_iter, session);
+ for (const auto& session : lttng_live_msg_iter->sessions) {
+ const auto status = lttng_live_get_session(lttng_live_msg_iter, session.get());
switch (status) {
case LTTNG_LIVE_ITERATOR_STATUS_OK:
case LTTNG_LIVE_ITERATOR_STATUS_END:
*/
break;
default:
- goto end;
+ return status;
}
if (!session->closed) {
nr_sessions_opened++;
}
if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
- status = LTTNG_LIVE_ITERATOR_STATUS_END;
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
} else {
- status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
-
-end:
- return status;
}
static enum lttng_live_iterator_status
timestamp);
const auto msg = bt_message_message_iterator_inactivity_create(
- lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+ lttng_live_msg_iter->selfMsgIter.libObjPtr(), stream_iter->trace->clock_class->libObjPtr(),
+ timestamp);
if (!msg) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
{
- enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
-
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
lttng_live_stream_iterator_set_state(lttng_live_stream,
LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
- ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
}
- ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
- lttng_live_stream->current_inactivity_ts);
+ const auto status = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
+ lttng_live_stream->current_inactivity_ts);
+
+ if (status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ return status;
+ }
lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
lttng_live_stream->last_inactivity_ts.is_set = true;
-end:
- return ret;
+
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
- const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
+ bt2::ConstMessage msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
{
- const bt_clock_snapshot *clock_snapshot = NULL;
- int ret = 0;
+ bt2::OptionalBorrowedObject<bt2::ConstClockSnapshot> clockSnapshot;
- BT_ASSERT_DBG(msg);
BT_ASSERT_DBG(ts_ns);
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
- "last-msg-ts={}",
- fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
+ "Getting message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}",
+ fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns);
- switch (bt_message_get_type(msg)) {
- case BT_MESSAGE_TYPE_EVENT:
- clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
+ switch (msg.type()) {
+ case bt2::MessageType::Event:
+ clockSnapshot = msg.asEvent().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_PACKET_BEGINNING:
- clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
+ case bt2::MessageType::PacketBeginning:
+
+ clockSnapshot = msg.asPacketBeginning().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_PACKET_END:
- clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
+ case bt2::MessageType::PacketEnd:
+ clockSnapshot = msg.asPacketEnd().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
- clock_snapshot =
- bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
+ case bt2::MessageType::DiscardedEvents:
+ clockSnapshot = msg.asDiscardedEvents().beginningDefaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
- clock_snapshot =
- bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
+ case bt2::MessageType::DiscardedPackets:
+ clockSnapshot = msg.asDiscardedPackets().beginningDefaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
- clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
+ case bt2::MessageType::MessageIteratorInactivity:
+ clockSnapshot = msg.asMessageIteratorInactivity().clockSnapshot();
break;
default:
/* All the other messages have a higher priority */
- BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
- "Message has no timestamp: using the last message timestamp.");
+ BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
+ "Message has no timestamp, using the last message timestamp: "
+ "iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}",
+ fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns,
+ *ts_ns);
*ts_ns = last_msg_ts_ns;
- goto end;
- }
-
- ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
- if (ret) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
- "Cannot get nanoseconds from Epoch of clock snapshot: "
- "clock-snapshot-addr={}",
- fmt::ptr(clock_snapshot));
- goto error;
+ return 0;
}
- goto end;
-
-error:
- ret = -1;
+ *ts_ns = clockSnapshot->nsFromOrigin();
-end:
- if (ret == 0) {
- BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
- "last-msg-ts={}, ts={}",
- fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
- }
+ BT_CPPLOGD_SPEC(
+ lttng_live_msg_iter->logger,
+ "Found message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}",
+ fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns, *ts_ns);
- return ret;
+ return 0;
}
static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
{
- enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
- enum ctf_msg_iter_status status;
- uint64_t session_idx, trace_idx;
-
- for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
- struct lttng_live_session *session =
- (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
-
+ for (const auto& session : lttng_live_msg_iter->sessions) {
if (session->new_streams_needed) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Need an update for streams: "
- "session-id={}",
- session->id);
- ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
- goto end;
+ "Need an update for streams: session-id={}", session->id);
+ return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
}
- for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+
+ for (const auto& trace : session->traces) {
if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Need an update for metadata stream: "
- "session-id={}, trace-id={}",
+ "Need an update for metadata stream: session-id={}, trace-id={}",
session->id, trace->id);
- ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
}
}
}
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
- ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
- "Invalid state of live stream iterator"
- "stream-iter-status={}",
+ "Invalid state of live stream iterator: stream-iter-status={}",
lttng_live_stream->state);
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- const bt_message *msg;
- status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
- switch (status) {
- case CTF_MSG_ITER_STATUS_EOF:
- ret = LTTNG_LIVE_ITERATOR_STATUS_END;
- break;
- case CTF_MSG_ITER_STATUS_OK:
- message = bt2::ConstMessage::Shared::createWithoutRef(msg);
- ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
- break;
- case CTF_MSG_ITER_STATUS_AGAIN:
- /*
- * Continue immediately (end of packet). The next
- * get_index may return AGAIN to delay the following
- * attempt.
- */
- ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
- break;
- case CTF_MSG_ITER_STATUS_ERROR:
- default:
- ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
- "CTF message iterator failed to get next message: "
- "msg-iter={}, msg-iter-status={}",
- fmt::ptr(lttng_live_stream->msg_iter), status);
- break;
+ if (!lttng_live_stream->msg_iter) {
+ /* The first time we're called for this stream, the MsgIter is not instantiated. */
+ enum lttng_live_iterator_status ret =
+ lttng_live_stream_iterator_create_msg_iter(lttng_live_stream);
+ if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ return ret;
+ }
}
-end:
- return ret;
+ try {
+ message = lttng_live_stream->msg_iter->next();
+ if (message) {
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
+ } else {
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+ } catch (const bt2c::TryAgain&) {
+ return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } catch (const bt2::Error&) {
+ BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
+ "CTF message iterator failed to get next message: msg-iter={}",
+ fmt::ptr(&*lttng_live_stream->msg_iter));
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
}
static enum lttng_live_iterator_status
struct lttng_live_stream_iterator *stream_iter,
bt2::ConstMessage::Shared& curr_msg)
{
- enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
-
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Closing live stream iterator: stream-name=\"{}\", "
"viewer-stream-id={}",
* `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
- const bt_message *msg;
- enum ctf_msg_iter_status status =
- ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
+ try {
+ if (!stream_iter->msg_iter) {
+ BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
+ "Reached the end of the live stream iterator.");
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+
+ curr_msg = stream_iter->msg_iter->next();
+ if (!curr_msg) {
+ BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
+ "Reached the end of the live stream iterator.");
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
- if (status == CTF_MSG_ITER_STATUS_ERROR) {
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
+ } catch (const bt2::Error&) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error getting the next message from CTF message iterator");
- live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
- } else if (status == CTF_MSG_ITER_STATUS_EOF) {
- BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
- "Reached the end of the live stream iterator.");
- live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
-
- BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
-
- curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
-
-end:
- return live_status;
}
/*
}
static enum lttng_live_iterator_status
-adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
- uint64_t new_begin_ts)
+adjust_discarded_packets_message(bt_self_message_iterator *iter, bt2::Stream stream,
+ bt2::ConstDiscardedPacketsMessage msgIn,
+ bt2::ConstMessage::Shared& msgOut, uint64_t new_begin_ts)
{
- enum bt_property_availability availability;
- const bt_clock_snapshot *clock_snapshot;
- uint64_t end_ts;
- uint64_t count;
-
- clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
- end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
- availability = bt_message_discarded_packets_get_count(msg_in, &count);
- BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+ BT_ASSERT_DBG(msgIn.count());
const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
- iter, stream, new_begin_ts, end_ts);
+ iter, stream.libObjPtr(), new_begin_ts, msgIn.endDefaultClockSnapshot().value());
if (!msg) {
return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_packets_set_count(msg, count);
- msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ bt_message_discarded_packets_set_count(msg, *msgIn.count());
+ msgOut = bt2::Message::Shared::createWithoutRef(msg);
+
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
-adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
- uint64_t new_begin_ts)
+adjust_discarded_events_message(bt_self_message_iterator *iter, const bt2::Stream stream,
+ bt2::ConstDiscardedEventsMessage msgIn,
+ bt2::ConstMessage::Shared& msgOut, uint64_t new_begin_ts)
{
- enum bt_property_availability availability;
- const bt_clock_snapshot *clock_snapshot;
- uint64_t end_ts;
- uint64_t count;
-
- clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
- end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
- availability = bt_message_discarded_events_get_count(msg_in, &count);
- BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+ BT_ASSERT_DBG(msgIn.count());
const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
- iter, stream, new_begin_ts, end_ts);
+ iter, stream.libObjPtr(), new_begin_ts, msgIn.endDefaultClockSnapshot().value());
if (!msg) {
return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_events_set_count(msg, count);
- msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ bt_message_discarded_events_set_count(msg, *msgIn.count());
+ msgOut = bt2::Message::Shared::createWithoutRef(msg);
+
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
const bt2::ConstMessage& late_msg)
{
- const bt_clock_class *clock_class;
- const bt_stream_class *stream_class;
- enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
- int64_t last_inactivity_ts_ns;
- enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
- enum lttng_live_iterator_status adjust_status;
- bt2::ConstMessage::Shared adjusted_message;
-
/*
* The timestamp of the current message is before the last message sent
* by this component. We CANNOT send it as is.
"Invalid live stream state: "
"have a late message when no inactivity message "
"was ever sent for that stream.");
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
if (!is_discarded_packet_or_event_message(late_msg)) {
"have a late message that is not a packet discarded or "
"event discarded message: late-msg-type={}",
late_msg.type());
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
- clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
-
- ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
- clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
- if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
- "Error converting last "
- "inactivity message timestamp to nanoseconds");
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
- }
+ const auto streamClass = stream_iter->stream->cls();
+ const auto clockClass = streamClass.defaultClockClass();
+ int64_t last_inactivity_ts_ns =
+ clockClass->cyclesToNsFromOrigin(stream_iter->last_inactivity_ts.value);
if (last_inactivity_ts_ns <= late_msg_ts_ns) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Invalid live stream state: "
"inactivity timespan: last-inactivity-ts-ns={}, "
"late-msg-ts-ns={}",
last_inactivity_ts_ns, late_msg_ts_ns);
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
/*
* adjust its timestamp to ensure monotonicity.
*/
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Adjusting the timestamp of late message: late-msg-type={}, "
- "msg-new-ts-ns={}",
+ "Adjusting the timestamp of late message: late-msg-type={}, msg-new-ts-ns={}",
late_msg.type(), stream_iter->last_inactivity_ts.value);
- switch (late_msg.type()) {
- case bt2::MessageType::DiscardedEvents:
- adjust_status = adjust_discarded_events_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
- late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
- break;
- case bt2::MessageType::DiscardedPackets:
- adjust_status = adjust_discarded_packets_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
- late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
- break;
- default:
- bt_common_abort();
- }
+
+ bt2::ConstMessage::Shared adjustedMessage;
+
+ const auto adjust_status = bt2c::call([&] {
+ switch (late_msg.type()) {
+ case bt2::MessageType::DiscardedEvents:
+ return adjust_discarded_events_message(lttng_live_msg_iter->selfMsgIter.libObjPtr(),
+ *stream_iter->stream,
+ late_msg.asDiscardedEvents(), adjustedMessage,
+ stream_iter->last_inactivity_ts.value);
+
+ case bt2::MessageType::DiscardedPackets:
+ return adjust_discarded_packets_message(lttng_live_msg_iter->selfMsgIter.libObjPtr(),
+ *stream_iter->stream,
+ late_msg.asDiscardedPackets(), adjustedMessage,
+ stream_iter->last_inactivity_ts.value);
+
+ default:
+ bt_common_abort();
+ }
+ });
if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- stream_iter_status = adjust_status;
- goto end;
+ return adjust_status;
}
- BT_ASSERT_DBG(adjusted_message);
- stream_iter->current_msg = adjusted_message;
+ BT_ASSERT_DBG(adjustedMessage);
+ stream_iter->current_msg = std::move(adjustedMessage);
stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
-end:
- return stream_iter_status;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
struct lttng_live_stream_iterator **youngest_trace_stream_iter)
{
struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
- enum lttng_live_iterator_status stream_iter_status;
int64_t youngest_candidate_msg_ts = INT64_MAX;
uint64_t stream_iter_idx;
bt2::ConstMessage::Shared msg;
int64_t curr_msg_ts_ns = INT64_MAX;
- stream_iter_status =
+ const auto stream_iter_status =
lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
}
if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- goto end;
+ return stream_iter_status;
}
BT_ASSERT_DBG(msg);
* Get the timestamp in nanoseconds from origin of this
* message.
*/
- live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
- lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
+ live_get_msg_ts_ns(lttng_live_msg_iter, *msg, lttng_live_msg_iter->last_msg_ts_ns,
+ &curr_msg_ts_ns);
/*
* Check if the message of the current live stream
* We received a message from the past. This
* may be fixable but it can also be an error.
*/
- stream_iter_status =
- handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
- if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ if (handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg) !=
+ LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Late message could not be handled correctly: "
"lttng-live-msg-iter-addr={}, "
fmt::ptr(lttng_live_msg_iter), stream_iter->name,
curr_msg_ts_ns,
lttng_live_msg_iter->last_msg_ts_ns);
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
}
}
if (youngest_candidate_stream_iter) {
*youngest_trace_stream_iter = youngest_candidate_stream_iter;
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
} else {
/*
* The only case where we don't have a candidate for this trace
* is if we reached the end of all the iterators.
*/
BT_ASSERT(live_trace->stream_iterators.empty());
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
}
-
-end:
- return stream_iter_status;
}
static enum lttng_live_iterator_status
if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
- goto end;
+ return stream_iter_status;
}
- BT_ASSERT_DBG(session->traces);
-
- while (trace_idx < session->traces->len) {
+ while (trace_idx < session->traces.size()) {
bool trace_is_ended = false;
struct lttng_live_stream_iterator *stream_iter;
- struct lttng_live_trace *trace =
- (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
+ lttng_live_trace *trace = session->traces[trace_idx].get();
stream_iter_status =
next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
*/
trace_is_ended = true;
} else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
- goto end;
+ return stream_iter_status;
}
if (!trace_is_ended) {
} else {
/*
* trace_idx is not incremented since
- * g_ptr_array_remove_index_fast replaces the
+ * vectorFastRemove replaces the
* element at trace_idx with the array's last element.
*/
- g_ptr_array_remove_index_fast(session->traces, trace_idx);
+ bt2c::vectorFastRemove(session->traces, trace_idx);
}
}
if (youngest_candidate_stream_iter) {
*youngest_session_stream_iter = youngest_candidate_stream_iter;
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
} else {
/*
* The only cases where we don't have a candidate for this
*
* In either cases, we return END.
*/
- BT_ASSERT(session->traces->len == 0);
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
+ BT_ASSERT(session->traces.empty());
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
}
-end:
- return stream_iter_status;
}
static inline void put_messages(bt_message_array_const msgs, uint64_t count)
(struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
enum lttng_live_iterator_status stream_iter_status;
- uint64_t session_idx;
*count = 0;
* is to prevent other graph users from using this live
* iterator in an messed up internal state.
*/
- status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
BT_CPPLOGE_APPEND_CAUSE_SPEC(
lttng_live_msg_iter->logger,
"Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
- goto end;
+ return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
}
/*
* the user, session count will be 0. In this case, we return status
* end to return gracefully.
*/
- if (lttng_live_msg_iter->sessions->len == 0) {
+ if (lttng_live_msg_iter->sessions.empty()) {
if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
- status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
- goto end;
+ return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
} else {
/*
* The are no more active session for this session
viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
- status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error creating LTTng live viewer session");
+ return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
- status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
+ return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
} else {
bt_common_abort();
}
- goto end;
}
}
}
*candidate_stream_iter = NULL;
int64_t youngest_msg_ts_ns = INT64_MAX;
- BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
- session_idx = 0;
- while (session_idx < lttng_live_msg_iter->sessions->len) {
- struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
- lttng_live_msg_iter->sessions, session_idx);
+ uint64_t session_idx = 0;
+ while (session_idx < lttng_live_msg_iter->sessions.size()) {
+ lttng_live_session *session = lttng_live_msg_iter->sessions[session_idx].get();
/* Find the best candidate message to send downstream. */
stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
* - All live stream iterators have ENDed.
*/
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
- if (session->closed && session->traces->len == 0) {
+ if (session->closed && session->traces.empty()) {
/*
* Remove the session from the list.
* session_idx is not modified since
* replaces the the removed element with
* the array's last element.
*/
- g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
+ bt2c::vectorFastRemove(lttng_live_msg_iter->sessions, session_idx);
} else {
session_idx++;
}
* that of the current candidate message. We
* must break the tie in a predictable manner.
*/
- BT_CPPLOGD_STR_SPEC(
+ BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
"Two of the next message candidates have the same timestamps, pick one deterministically.");
/*
bt_common_abort();
}
-end:
return status;
} catch (const std::bad_alloc&) {
return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
}
}
-static struct lttng_live_msg_iter *
+static lttng_live_msg_iter::UP
lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
- bt_self_message_iterator *self_msg_it)
+ const bt2::SelfMessageIterator selfMsgIter)
{
- lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
- msg_iter->self_comp = lttng_live_comp->self_comp;
- msg_iter->lttng_live_comp = lttng_live_comp;
- msg_iter->self_msg_iter = self_msg_it;
+ auto msg_iter = bt2s::make_unique<struct lttng_live_msg_iter>(
+ lttng_live_comp->logger, lttng_live_comp->selfComp, selfMsgIter);
+ msg_iter->lttng_live_comp = lttng_live_comp;
msg_iter->active_stream_iter = 0;
msg_iter->last_msg_ts_ns = INT64_MIN;
msg_iter->was_interrupted = false;
- msg_iter->sessions =
- g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
- BT_ASSERT(msg_iter->sessions);
-
return msg_iter;
}
bt_self_message_iterator_configuration *, bt_self_component_port_output *)
{
try {
- bt_message_iterator_class_initialize_method_status status;
struct lttng_live_component *lttng_live;
- struct lttng_live_msg_iter *lttng_live_msg_iter;
enum lttng_live_viewer_status viewer_status;
bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
BT_ASSERT(!lttng_live->has_msg_iter);
lttng_live->has_msg_iter = true;
- lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
+ auto lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, bt2::wrap(self_msg_it));
if (!lttng_live_msg_iter) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
"Failed to create lttng_live_msg_iter");
- status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
- goto error;
+ return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
}
viewer_status = live_viewer_connection_create(
- lttng_live->params.url.c_str(), false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
- <tng_live_msg_iter->viewer_connection);
+ lttng_live->params.url.c_str(), false, lttng_live_msg_iter.get(),
+ lttng_live_msg_iter->logger, lttng_live_msg_iter->viewer_connection);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Interrupted while creating viewer connection");
}
- status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
- goto error;
+ return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
}
- viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+ viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter.get());
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Interrupted when creating viewer session");
}
- status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
- goto error;
+ return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
}
- if (lttng_live_msg_iter->sessions->len == 0) {
+ if (lttng_live_msg_iter->sessions.empty()) {
switch (lttng_live->params.sess_not_found_act) {
case SESSION_NOT_FOUND_ACTION_CONTINUE:
BT_CPPLOGI_SPEC(
"component parameter: url =\"{}\"",
SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
lttng_live->params.url);
- status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
- goto error;
+ return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
case SESSION_NOT_FOUND_ACTION_END:
BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
"Unable to connect to the requested live viewer session. "
}
}
- bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
- status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
- goto end;
-
-error:
- lttng_live_msg_iter_destroy(lttng_live_msg_iter);
-end:
- return status;
+ bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter.release());
+ return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
} catch (const std::bad_alloc&) {
return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
} catch (const bt2::Error&) {
bt_param_validation_value_descr::makeString()},
BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
-static bt_component_class_query_method_status
-lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
- const bt2c::Logger& logger)
+static bt2::Value::Shared lttng_live_query_list_sessions(const bt2::ConstMapValue params,
+ const bt2c::Logger& logger)
{
- bt_component_class_query_method_status status;
- const bt_value *url_value = NULL;
const char *url;
- struct live_viewer_connection *viewer_connection = NULL;
+ live_viewer_connection::UP viewer_connection;
enum lttng_live_viewer_status viewer_status;
enum bt_param_validation_status validation_status;
gchar *validate_error = NULL;
- validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
+ validation_status =
+ bt_param_validation_validate(params.libObjPtr(), list_sessions_params, &validate_error);
if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
- goto error;
+ throw bt2c::MemoryError {};
} else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
- BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
- goto error;
+ bt2c::GCharUP errorFreer {validate_error};
+
+ BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error, "{}", validate_error);
}
- url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
- url = bt_value_string_get(url_value);
+ url = params[URL_PARAM]->asString().value();
- viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
+ viewer_status = live_viewer_connection_create(url, true, NULL, logger, viewer_connection);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+ BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error,
+ "Failed to create viewer connection");
} else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
+ throw bt2c::TryAgain {};
} else {
bt_common_abort();
}
- goto error;
- }
-
- status = live_viewer_connection_list_sessions(viewer_connection, result);
- if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
- goto error;
}
- goto end;
-
-error:
- BT_VALUE_PUT_REF_AND_RESET(*result);
-
- if (status >= 0) {
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
- }
-
-end:
- if (viewer_connection) {
- live_viewer_connection_destroy(viewer_connection);
- }
-
- g_free(validate_error);
-
- return status;
+ return live_viewer_connection_list_sessions(viewer_connection.get());
}
-static bt_component_class_query_method_status
-lttng_live_query_support_info(const bt_value *params, const bt_value **result,
- const bt2c::Logger& logger)
+static bt2::Value::Shared lttng_live_query_support_info(const bt2::ConstMapValue params,
+ const bt2c::Logger& logger)
{
- bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
- const bt_value *input_type_value;
- const bt_value *input_value;
- double weight = 0;
struct bt_common_lttng_live_url_parts parts = {};
+ bt_common_lttng_live_url_parts_deleter partsDeleter {parts};
- /* Used by the logging macros */
- __attribute__((unused)) bt_self_component *self_comp = NULL;
-
- *result = NULL;
- input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
- if (!input_type_value) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
- goto error;
+ const auto typeValue = params["type"];
+ if (!typeValue) {
+ BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error,
+ "Missing expected `type` parameter.");
}
- if (!bt_value_is_string(input_type_value)) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
- goto error;
+ if (!typeValue->isString()) {
+ BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error,
+ "`type` parameter is not a string value.");
}
- if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
+ if (strcmp(typeValue->asString().value(), "string") != 0) {
/* We don't handle file system paths */
- goto create_result;
+ return bt2::RealValue::create();
}
- input_value = bt_value_map_borrow_entry_value_const(params, "input");
- if (!input_value) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
- goto error;
+ const auto inputValue = params["input"];
+ if (!inputValue) {
+ BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error,
+ "Missing expected `input` parameter.");
}
- if (!bt_value_is_string(input_value)) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
- goto error;
+ if (!inputValue->isString()) {
+ BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error,
+ "`input` parameter is not a string value.");
}
- parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
+ parts = bt_common_parse_lttng_live_url(inputValue->asString().value(), NULL, 0);
if (parts.session_name) {
/*
* Looks pretty much like an LTTng live URL: we got the
* session name part, which forms a complete URL.
*/
- weight = .75;
+ return bt2::RealValue::create(.75);
}
-create_result:
- *result = bt_value_real_create_init(weight);
- if (!*result) {
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
- goto error;
- }
-
- goto end;
-
-error:
- if (status >= 0) {
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
- }
-
- BT_ASSERT(!*result);
-
-end:
- bt_common_destroy_lttng_live_url_parts(&parts);
- return status;
+ return bt2::RealValue::create();
}
bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
const bt_value **result)
{
try {
- bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
bt2::PrivateQueryExecutor {priv_query_exec},
"PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
+ const bt2::ConstMapValue paramsObj(params);
+ bt2::Value::Shared resultObj;
if (strcmp(object, "sessions") == 0) {
- status = lttng_live_query_list_sessions(params, result, logger);
+ resultObj = lttng_live_query_list_sessions(paramsObj, logger);
} else if (strcmp(object, "babeltrace.support-info") == 0) {
- status = lttng_live_query_support_info(params, result, logger);
+ resultObj = lttng_live_query_support_info(paramsObj, logger);
} else {
BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
- status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
- goto end;
+ return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
}
-end:
- return status;
+ *result = resultObj.release().libObjPtr();
+
+ return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
+ } catch (const bt2c::TryAgain&) {
+ return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
} catch (const std::bad_alloc&) {
return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
} catch (const bt2::Error&) {
const bt_value *value;
enum bt_param_validation_status validation_status;
gchar *validation_error = NULL;
- bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
+ bt2c::Logger logger {bt2::wrap(self_comp), "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
}
- auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
- lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
+ auto lttng_live =
+ bt2s::make_unique<lttng_live_component>(std::move(logger), bt2::wrap(self_comp));
+
lttng_live->max_query_size = MAX_QUERY_SIZE;
lttng_live->has_msg_iter = false;
try {
lttng_live_component::UP lttng_live;
bt_component_class_initialize_method_status ret;
- bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
bt_self_component_add_port_status add_port_status;
ret = lttng_live_component_create(params, self_comp_src, lttng_live);
return ret;
}
- bt_self_component_set_data(self_comp, lttng_live.release());
+ bt_self_component_set_data(bt_self_component_source_as_self_component(self_comp_src),
+ lttng_live.release());
return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
} catch (const std::bad_alloc&) {