}
/*
- * For active no data stream, fetch next data. It can be either:
- * - quiescent: need to put it in the prio heap at quiescent end
- * timestamp,
- * - have data: need to wire up first event into the prio heap,
- * - have no data on this stream at this point: need to retry (AGAIN) or
- * return EOF.
+ * For active no data stream, fetch next index. As a result of that it can
+ * become either:
+ * - quiescent: won't have events for a bit,
+ * - have data: need to get that data and produce the event,
+ * - have no data on this stream at this point: need to retry (AGAIN) or return
+ * EOF.
*/
static
enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
if (lttng_live_stream->trace->metadata_stream_state ==
LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
+ BT_COMP_LOGD("Need to get an update for the metadata stream before proceeding further with this stream: "
+ "stream-name=\"%s\"", lttng_live_stream->name->str);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
+
if (lttng_live_stream->trace->session->new_streams_needed) {
+ BT_COMP_LOGD("Need to get an update of all streams before proceeding further with this stream: "
+ "stream-name=\"%s\"", lttng_live_stream->name->str);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
+
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
goto end;
}
ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
- &index);
+ &index);
if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
+
BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
+
if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
curr_inact_ts = lttng_live_stream->current_inactivity_ts;
}
goto end;
}
+
lttng_live_stream->base_offset = index.offset;
lttng_live_stream->offset = index.offset;
lttng_live_stream->len = index.packet_size / CHAR_BIT;
+
+ BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", "
+ "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64
+ ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64,
+ lttng_live_stream->name->str,
+ lttng_live_stream->viewer_stream_id,
+ lttng_live_stream->base_offset,
+ lttng_live_stream->offset, lttng_live_stream->len);
+
end:
if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
enum lttng_live_iterator_status status;
uint64_t trace_idx;
- BT_COMP_LOGD("Updating all streams for session: "
- "session-id=%"PRIu64", session-name=\"%s\"",
- session->id, session->session_name->str);
-
if (!session->attached) {
+ BT_COMP_LOGD("Attach to session: session-id=%" PRIu64,
+ session->id);
enum lttng_live_viewer_status attach_status =
lttng_live_session_attach(session,
lttng_live_msg_iter->self_msg_iter);
}
}
+ BT_COMP_LOGD("Updating all streams and metadata for session: "
+ "session-id=%" PRIu64 ", session-name=\"%s\"",
+ session->id, session->session_name->str);
+
status = lttng_live_session_get_new_streams(session,
lttng_live_msg_iter->self_msg_iter);
if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
status != LTTNG_LIVE_ITERATOR_STATUS_END) {
goto end;
}
+
trace_idx = 0;
while (trace_idx < session->traces->len) {
struct lttng_live_trace *trace =
goto end;
}
}
+
+ /*
+ * Now that we have the metadata we can initialize the downstream
+ * iterator.
+ */
status = lttng_live_lazy_msg_init(session,
lttng_live_msg_iter->self_msg_iter);
void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
uint64_t session_idx, trace_idx;
+ bt_logging_level log_level = lttng_live_msg_iter->log_level;
+ bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
session_idx++) {
struct lttng_live_session *session =
g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
+ BT_COMP_LOGD("Force marking session as needing new streams: "
+ "session-id=%" PRIu64, session->id);
session->new_streams_needed = true;
for (trace_idx = 0; trace_idx < session->traces->len;
trace_idx++) {
struct lttng_live_trace *trace =
g_ptr_array_index(session->traces, trace_idx);
+ BT_COMP_LOGD("Force marking trace metadata state as needing an update: "
+ "session-id=%" PRIu64 ", trace-id=%" PRIu64,
+ session->id, trace->id);
BT_ASSERT(trace->metadata_stream_state !=
LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
BT_ASSERT(stream_iter->trace->clock_class);
+ BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
+ ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
+ stream_iter->ctf_stream_class_id,
+ stream_iter->viewer_stream_id, timestamp);
+
msg = bt_message_message_iterator_inactivity_create(
lttng_live_msg_iter->self_msg_iter,
stream_iter->trace->clock_class, timestamp);
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
+ /*
+ * Check if we already sent an inactivty message downstream for this
+ * `current_inactivity_ts` value.
+ */
if (lttng_live_stream->current_inactivity_ts ==
lttng_live_stream->last_inactivity_ts) {
lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
if (session->new_streams_needed) {
+ BT_COMP_LOGD("Need an update for streams: "
+ "session-id=%" PRIu64, session->id);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
struct lttng_live_trace *trace =
g_ptr_array_index(session->traces, trace_idx);
if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
+ BT_COMP_LOGD("Need an update for metadata stream: "
+ "session-id=%" PRIu64 ", trace-id=%" PRIu64,
+ session->id, trace->id);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
LTTNG_LIVE_ITERATOR_STATUS_OK;
bt_logging_level log_level = lttng_live_msg_iter->log_level;
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
+
+ BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", "
+ "viewer-stream-id=%" PRIu64, stream_iter->name->str,
+ stream_iter->viewer_stream_id);
+
/*
* The viewer has hung up on us so we are closing the stream. The
* `ctf_msg_iter` should simply realize that it needs to close the
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
enum lttng_live_iterator_status live_status;
- BT_COMP_LOGD("Finding the next message for stream iterator: "
- "stream-name=\"%s\"", stream_iter->name->str);
+ BT_COMP_LOGD("Advancing live stream iterator until next message if possible: "
+ "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
+ stream_iter->name->str, stream_iter->viewer_stream_id);
if (stream_iter->has_stream_hung_up) {
/*
retry:
LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
+
+ /*
+ * Make sure we have the most recent metadata and possibly some new
+ * streams.
+ */
live_status = lttng_live_iterator_handle_new_streams_and_metadata(
lttng_live_msg_iter);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
+
live_status = lttng_live_iterator_next_handle_one_no_data_stream(
lttng_live_msg_iter, stream_iter);
-
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
/*
}
goto end;
}
+
live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
lttng_live_msg_iter, stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
end:
if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
+ BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams");
goto retry;
}
+ BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s"
+ "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
+ lttng_live_iterator_status_string(live_status),
+ stream_iter->name->str, stream_iter->viewer_stream_id);
+
return live_status;
}
while (!stream_iter->current_msg) {
const bt_message *msg = NULL;
int64_t curr_msg_ts_ns = INT64_MAX;
+
stream_iter_status = lttng_live_iterator_next_msg_on_stream(
lttng_live_msg_iter, stream_iter, &msg);
- BT_COMP_LOGD("live stream iterator returned status :%s",
- lttng_live_iterator_status_string(stream_iter_status));
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
stream_iter_is_ended = true;
break;
BT_ASSERT_DBG(msg);
+ BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s"
+ "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
+ bt_common_message_type_string(bt_message_get_type(msg)),
+ stream_iter->name->str, stream_iter->viewer_stream_id);
+
/*
* Get the timestamp in nanoseconds from origin of this
* messsage.