#include "common/assert.h"
#include "cpp-common/bt2c/fmt.hpp"
#include "cpp-common/bt2c/glib-up.hpp"
+#include "cpp-common/bt2s/make-unique.hpp"
#include "cpp-common/vendor/fmt/format.h"
#include "plugins/common/muxing/muxing.h"
BT_ASSERT(trace->stream_iterators);
g_ptr_array_free(trace->stream_iterators, TRUE);
- lttng_live_metadata_fini(trace);
delete trace;
}
if (lttng_live_stream->trace->metadata_stream_state ==
LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
- BT_CPPLOGD_SPEC(
- lttng_live_msg_iter->logger,
- "Need to get an update for the metadata stream before proceeding further with this stream: "
- "stream-name=\"{}\"",
- lttng_live_stream->name->str);
+ BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
+ "Need to get an update for the metadata stream before proceeding "
+ "further with this stream: stream-name=\"{}\"",
+ lttng_live_stream->name);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
if (lttng_live_stream->trace->session->new_streams_needed) {
- BT_CPPLOGD_SPEC(
- lttng_live_msg_iter->logger,
- "Need to get an update of all streams before proceeding further with this stream: "
- "stream-name=\"{}\"",
- lttng_live_stream->name->str);
+ BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
+ "Need to get an update of all streams before proceeding further "
+ "with this stream: stream-name=\"{}\"",
+ lttng_live_stream->name);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Setting live stream reading info: stream-name=\"{}\", "
"viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
- lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
+ lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
lttng_live_stream->base_offset, lttng_live_stream->offset,
lttng_live_stream->len);
goto end;
}
- status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
+ status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), message);
switch (status) {
case CTF_MSG_ITER_STATUS_EOF:
ret = LTTNG_LIVE_ITERATOR_STATUS_END;
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Closing live stream iterator: stream-name=\"{}\", "
"viewer-stream-id={}",
- stream_iter->name->str, stream_iter->viewer_stream_id);
+ stream_iter->name, stream_iter->viewer_stream_id);
/*
* The viewer has hung up on us so we are closing the stream. The
* stream properly by emitting the necessary stream end message.
*/
enum ctf_msg_iter_status status =
- ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
+ ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), curr_msg);
if (status == CTF_MSG_ITER_STATUS_ERROR) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Advancing live stream iterator until next message if possible: "
"stream-name=\"{}\", viewer-stream-id={}",
- stream_iter->name->str, stream_iter->viewer_stream_id);
+ stream_iter->name, stream_iter->viewer_stream_id);
if (stream_iter->has_stream_hung_up) {
/*
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Returning from advancing live stream iterator: status={}, "
"stream-name=\"{}\", viewer-stream-id={}",
- live_status, stream_iter->name->str, stream_iter->viewer_stream_id);
+ live_status, stream_iter->name, stream_iter->viewer_stream_id);
return live_status;
}
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Handling late message on live stream iterator: "
"stream-name=\"{}\", viewer-stream-id={}",
- stream_iter->name->str, stream_iter->viewer_stream_id);
+ stream_iter->name, stream_iter->viewer_stream_id);
if (!stream_iter->last_inactivity_ts.is_set) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
goto end;
}
- stream_class = bt_stream_borrow_class_const(stream_iter->stream);
+ stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
switch (bt_message_get_type(late_msg)) {
case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
adjust_status = adjust_discarded_events_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
- stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
+ &adjusted_message, stream_iter->last_inactivity_ts.value);
break;
case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
adjust_status = adjust_discarded_packets_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
- stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), late_msg,
+ &adjusted_message, stream_iter->last_inactivity_ts.value);
break;
default:
bt_common_abort();
"Live stream iterator returned message: msg-type={}, "
"stream-name=\"{}\", viewer-stream-id={}",
static_cast<bt2::MessageType>(bt_message_get_type(msg)),
- stream_iter->name->str, stream_iter->viewer_stream_id);
+ stream_iter->name, stream_iter->viewer_stream_id);
/*
* Get the timestamp in nanoseconds from origin of this
"lttng-live-msg-iter-addr={}, "
"stream-name=\"{}\", "
"curr-msg-ts={}, last-msg-ts={}",
- fmt::ptr(lttng_live_msg_iter),
- stream_iter->name->str, curr_msg_ts_ns,
+ fmt::ptr(lttng_live_msg_iter), stream_iter->name,
+ curr_msg_ts_ns,
lttng_live_msg_iter->last_msg_ts_ns);
stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
void lttng_live_component_finalize(bt_self_component_source *component)
{
- lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
- bt_self_component_source_as_self_component(component));
-
- delete data;
+ lttng_live_component::UP {static_cast<lttng_live_component *>(
+ bt_self_component_get_data(bt_self_component_source_as_self_component(component)))};
}
static enum session_not_found_action
static bt_component_class_initialize_method_status
lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
- struct lttng_live_component **component)
+ lttng_live_component::UP& component)
{
- struct lttng_live_component *lttng_live = NULL;
const bt_value *inputs_value;
const bt_value *url_value;
const bt_value *value;
enum bt_param_validation_status validation_status;
gchar *validation_error = NULL;
- bt_component_class_initialize_method_status status;
bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
- status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
- goto error;
+ return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
} else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
bt2c::GCharUP errorFreer {validation_error};
BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
- status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
- goto error;
+ return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
}
- lttng_live = new lttng_live_component {std::move(logger)};
+ auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
lttng_live->max_query_size = MAX_QUERY_SIZE;
lttng_live->has_msg_iter = false;
lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
}
- status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
- goto end;
-
-error:
- delete lttng_live;
- lttng_live = NULL;
-end:
- *component = lttng_live;
- return status;
+ component = std::move(lttng_live);
+ return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
}
bt_component_class_initialize_method_status
bt_self_component_source_configuration *, const bt_value *params, void *)
{
try {
- struct lttng_live_component *lttng_live;
+ lttng_live_component::UP lttng_live;
bt_component_class_initialize_method_status ret;
bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
bt_self_component_add_port_status add_port_status;
- ret = lttng_live_component_create(params, self_comp_src, <tng_live);
+ ret = lttng_live_component_create(params, self_comp_src, lttng_live);
if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
- goto error;
+ return ret;
}
add_port_status =
bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
ret = (bt_component_class_initialize_method_status) add_port_status;
- goto end;
+ return ret;
}
- bt_self_component_set_data(self_comp, lttng_live);
- goto end;
+ bt_self_component_set_data(self_comp, lttng_live.release());
-error:
- delete lttng_live;
- lttng_live = NULL;
-end:
- return ret;
+ return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
} catch (const std::bad_alloc&) {
return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
} catch (const bt2::Error&) {