}
BT_HIDDEN
-struct ctf_msg_iter *ctf_msg_iter_create(struct ctf_trace_class *tc,
+struct ctf_msg_iter *ctf_msg_iter_create(
+ struct ctf_trace_class *tc,
size_t max_request_sz,
struct ctf_msg_iter_medium_ops medops, void *data,
- bt_logging_level log_level, bt_self_component *self_comp)
+ bt_logging_level log_level,
+ bt_self_component *self_comp,
+ bt_self_message_iterator *self_msg_iter)
{
struct ctf_msg_iter *msg_it = NULL;
struct bt_bfcr_cbs cbs = {
goto end;
}
msg_it->self_comp = self_comp;
+ msg_it->self_msg_iter = self_msg_iter;
msg_it->log_level = log_level;
msg_it->meta.tc = tc;
msg_it->medium.medops = medops;
enum ctf_msg_iter_status ctf_msg_iter_get_next_message(
struct ctf_msg_iter *msg_it,
- bt_self_message_iterator *self_msg_iter, bt_message **message)
+ bt_message **message)
{
enum ctf_msg_iter_status status = CTF_MSG_ITER_STATUS_OK;
bt_self_component *self_comp = msg_it->self_comp;
BT_ASSERT_DBG(msg_it);
BT_ASSERT_DBG(message);
- msg_it->self_msg_iter = self_msg_iter;
msg_it->set_stream = true;
BT_COMP_LOGD("Getting next message: msg-it-addr=%p", msg_it);
* success, or \c NULL on error
*/
BT_HIDDEN
-struct ctf_msg_iter *ctf_msg_iter_create(struct ctf_trace_class *tc,
- size_t max_request_sz, struct ctf_msg_iter_medium_ops medops,
- void *medops_data, bt_logging_level log_level,
- bt_self_component *self_comp);
+struct ctf_msg_iter *ctf_msg_iter_create(
+ struct ctf_trace_class *tc,
+ size_t max_request_sz,
+ struct ctf_msg_iter_medium_ops medops,
+ void *medops_data,
+ bt_logging_level log_level,
+ bt_self_component *self_comp,
+ bt_self_message_iterator *self_msg_iter);
/**
* Destroys a CTF message iterator, freeing all internal resources.
BT_HIDDEN
enum ctf_msg_iter_status ctf_msg_iter_get_next_message(
struct ctf_msg_iter *msg_it,
- bt_self_message_iterator *msg_iter,
bt_message **message);
struct ctf_msg_iter_packet_properties {
bt_component_class_message_iterator_next_method_status status;
msg_iter_status = ctf_msg_iter_get_next_message(
- ds_file->msg_iter, ds_file->self_msg_iter, msg);
+ ds_file->msg_iter, msg);
switch (msg_iter_status) {
case CTF_MSG_ITER_STATUS_EOF:
port_data->ds_file_group->ctf_fs_trace->metadata->tc,
bt_common_get_page_size(msg_iter_data->log_level) * 8,
ctf_fs_ds_file_medops, NULL, msg_iter_data->log_level,
- self_comp);
+ self_comp, self_msg_iter);
if (!msg_iter_data->msg_iter) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Cannot create a CTF message iterator.");
ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
msg_iter = ctf_msg_iter_create(ctf_fs_trace->metadata->tc,
bt_common_get_page_size(log_level) * 8,
- ctf_fs_ds_file_medops, NULL, log_level, self_comp);
+ ctf_fs_ds_file_medops, NULL, log_level, self_comp, NULL);
if (!msg_iter) {
BT_COMP_LOGE_STR("Cannot create a CTF message iterator.");
goto error;
msg_iter = ctf_msg_iter_create(ctf_fs_trace->metadata->tc,
bt_common_get_page_size(log_level) * 8, ctf_fs_ds_file_medops,
- NULL, log_level, self_comp);
+ NULL, log_level, self_comp, NULL);
if (!msg_iter) {
/* ctf_msg_iter_create() logs errors. */
ret = -1;
BT_HIDDEN
enum lttng_live_iterator_status lttng_live_lazy_msg_init(
- struct lttng_live_session *session)
+ struct lttng_live_session *session,
+ bt_self_message_iterator *self_msg_iter)
{
struct lttng_live_component *lttng_live =
session->lttng_live_msg_iter->lttng_live_comp;
trace->metadata->decoder);
stream_iter->msg_iter = ctf_msg_iter_create(ctf_tc,
lttng_live->max_query_size, medops, stream_iter,
- log_level, self_comp);
+ log_level, self_comp, self_msg_iter);
if (!stream_iter->msg_iter) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
"Failed to create CTF message iterator");
struct lttng_live_stream_iterator *lttng_live_stream_iterator_create(
struct lttng_live_session *session,
uint64_t ctf_trace_id,
- uint64_t stream_id)
+ uint64_t stream_id,
+ bt_self_message_iterator *self_msg_iter)
{
struct lttng_live_stream_iterator *stream_iter;
struct lttng_live_component *lttng_live;
BT_ASSERT(!stream_iter->msg_iter);
stream_iter->msg_iter = ctf_msg_iter_create(ctf_tc,
lttng_live->max_query_size, medops, stream_iter,
- log_level, self_comp);
+ log_level, self_comp, self_msg_iter);
if (!stream_iter->msg_iter) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
"Failed to create CTF message iterator");
#include "lttng-live.h"
enum lttng_live_iterator_status lttng_live_lazy_msg_init(
- struct lttng_live_session *session);
+ struct lttng_live_session *session,
+ bt_self_message_iterator *self_msg_iter);
struct lttng_live_stream_iterator *lttng_live_stream_iterator_create(
struct lttng_live_session *session,
uint64_t ctf_trace_id,
- uint64_t stream_id);
+ uint64_t stream_id,
+ bt_self_message_iterator *self_msg_iter);
void lttng_live_stream_iterator_destroy(
struct lttng_live_stream_iterator *stream);
if (!session->attached) {
enum lttng_live_attach_session_status attach_status =
- lttng_live_attach_session(session);
+ lttng_live_attach_session(session,
+ lttng_live_msg_iter->self_msg_iter);
if (attach_status != LTTNG_LIVE_ATTACH_SESSION_STATUS_OK) {
if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
/*
}
}
- status = lttng_live_get_new_streams(session);
+ status = lttng_live_get_new_streams(session,
+ lttng_live_msg_iter->self_msg_iter);
if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
status != LTTNG_LIVE_ITERATOR_STATUS_END) {
goto end;
goto end;
}
}
- status = lttng_live_lazy_msg_init(session);
+ status = lttng_live_lazy_msg_init(session,
+ lttng_live_msg_iter->self_msg_iter);
end:
return status;
goto end;
}
- status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter,
- lttng_live_msg_iter->self_msg_iter, message);
+ status = ctf_msg_iter_get_next_message(
+ lttng_live_stream->msg_iter, message);
switch (status) {
case CTF_MSG_ITER_STATUS_EOF:
ret = LTTNG_LIVE_ITERATOR_STATUS_END;
* stream properly by emitting the necessary stream end message.
*/
enum ctf_msg_iter_status status = ctf_msg_iter_get_next_message(
- stream_iter->msg_iter, lttng_live_msg_iter->self_msg_iter,
- curr_msg);
+ stream_iter->msg_iter, curr_msg);
if (status == CTF_MSG_ITER_STATUS_ERROR) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
};
enum lttng_live_attach_session_status lttng_live_attach_session(
- struct lttng_live_session *session);
+ struct lttng_live_session *session,
+ bt_self_message_iterator *self_msg_iter);
int lttng_live_detach_session(struct lttng_live_session *session);
enum lttng_live_iterator_status lttng_live_get_new_streams(
- struct lttng_live_session *session);
+ struct lttng_live_session *session,
+ bt_self_message_iterator *self_msg_iter);
int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
uint64_t session_id,
static
int receive_streams(struct lttng_live_session *session,
- uint32_t stream_count)
+ uint32_t stream_count,
+ bt_self_message_iterator *self_msg_iter)
{
ssize_t ret_len;
uint32_t i;
BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s",
stream_id, stream.path_name, stream.channel_name);
live_stream = lttng_live_stream_iterator_create(session,
- ctf_trace_id, stream_id);
+ ctf_trace_id, stream_id, self_msg_iter);
if (!live_stream) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
"Error creating stream");
BT_HIDDEN
enum lttng_live_attach_session_status lttng_live_attach_session(
- struct lttng_live_session *session)
+ struct lttng_live_session *session,
+ bt_self_message_iterator *self_msg_iter)
{
struct lttng_viewer_cmd cmd;
enum lttng_live_attach_session_status attach_status;
}
/* We receive the initial list of streams. */
- if (receive_streams(session, streams_count)) {
+ if (receive_streams(session, streams_count, self_msg_iter)) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
goto error;
}
*/
BT_HIDDEN
enum lttng_live_iterator_status lttng_live_get_new_streams(
- struct lttng_live_session *session)
+ struct lttng_live_session *session,
+ bt_self_message_iterator *self_msg_iter)
{
enum lttng_live_iterator_status status =
LTTNG_LIVE_ITERATOR_STATUS_OK;
goto error;
}
- if (receive_streams(session, streams_count)) {
+ if (receive_streams(session, streams_count, self_msg_iter)) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
goto error;
}