}
/*
- * For active no data stream, fetch next data. It can be either:
- * - quiescent: need to put it in the prio heap at quiescent end
- * timestamp,
- * - have data: need to wire up first event into the prio heap,
- * - have no data on this stream at this point: need to retry (AGAIN) or
- * return EOF.
+ * For active no data stream, fetch next index. As a result of that it can
+ * become either:
+ * - quiescent: won't have events for a bit,
+ * - have data: need to get that data and produce the event,
+ * - have no data on this stream at this point: need to retry (AGAIN) or return
+ * EOF.
*/
static
enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
if (lttng_live_stream->trace->metadata_stream_state ==
LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
+ BT_COMP_LOGD("Need to get an update for the metadata stream before proceeding further with this stream: "
+ "stream-name=\"%s\"", lttng_live_stream->name->str);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
+
if (lttng_live_stream->trace->session->new_streams_needed) {
+ BT_COMP_LOGD("Need to get an update of all streams before proceeding further with this stream: "
+ "stream-name=\"%s\"", lttng_live_stream->name->str);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
+
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
goto end;
}
ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
- &index);
+ &index);
if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
+
BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
+
if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
curr_inact_ts = lttng_live_stream->current_inactivity_ts;
}
goto end;
}
+
lttng_live_stream->base_offset = index.offset;
lttng_live_stream->offset = index.offset;
lttng_live_stream->len = index.packet_size / CHAR_BIT;
+
+ BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", "
+ "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64
+ ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64,
+ lttng_live_stream->name->str,
+ lttng_live_stream->viewer_stream_id,
+ lttng_live_stream->base_offset,
+ lttng_live_stream->offset, lttng_live_stream->len);
+
end:
if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
enum lttng_live_iterator_status status;
uint64_t trace_idx;
- BT_COMP_LOGD("Updating all streams for session: "
- "session-id=%"PRIu64", session-name=\"%s\"",
- session->id, session->session_name->str);
-
if (!session->attached) {
+ BT_COMP_LOGD("Attach to session: session-id=%" PRIu64,
+ session->id);
enum lttng_live_viewer_status attach_status =
lttng_live_session_attach(session,
lttng_live_msg_iter->self_msg_iter);
}
}
+ BT_COMP_LOGD("Updating all streams and metadata for session: "
+ "session-id=%" PRIu64 ", session-name=\"%s\"",
+ session->id, session->session_name->str);
+
status = lttng_live_session_get_new_streams(session,
lttng_live_msg_iter->self_msg_iter);
if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
status != LTTNG_LIVE_ITERATOR_STATUS_END) {
goto end;
}
+
trace_idx = 0;
while (trace_idx < session->traces->len) {
struct lttng_live_trace *trace =
goto end;
}
}
+
+ /*
+ * Now that we have the metadata we can initialize the downstream
+ * iterator.
+ */
status = lttng_live_lazy_msg_init(session,
lttng_live_msg_iter->self_msg_iter);
void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
uint64_t session_idx, trace_idx;
+ bt_logging_level log_level = lttng_live_msg_iter->log_level;
+ bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
session_idx++) {
struct lttng_live_session *session =
g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
+ BT_COMP_LOGD("Force marking session as needing new streams: "
+ "session-id=%" PRIu64, session->id);
session->new_streams_needed = true;
for (trace_idx = 0; trace_idx < session->traces->len;
trace_idx++) {
struct lttng_live_trace *trace =
g_ptr_array_index(session->traces, trace_idx);
+ BT_COMP_LOGD("Force marking trace metadata state as needing an update: "
+ "session-id=%" PRIu64 ", trace-id=%" PRIu64,
+ session->id, trace->id);
BT_ASSERT(trace->metadata_stream_state !=
LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
enum session_not_found_action sess_not_found_act =
lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
- BT_COMP_LOGD("Update data and metadata of all sessions"
+ BT_COMP_LOGD("Update data and metadata of all sessions: "
"live-msg-iter-addr=%p", lttng_live_msg_iter);
/*
* In a remotely distant future, we could add a "new
BT_ASSERT(stream_iter->trace->clock_class);
+ BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
+ ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
+ stream_iter->ctf_stream_class_id,
+ stream_iter->viewer_stream_id, timestamp);
+
msg = bt_message_message_iterator_inactivity_create(
lttng_live_msg_iter->self_msg_iter,
stream_iter->trace->clock_class, timestamp);
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
+ /*
+ * Check if we already sent an inactivty message downstream for this
+ * `current_inactivity_ts` value.
+ */
if (lttng_live_stream->current_inactivity_ts ==
lttng_live_stream->last_inactivity_ts) {
lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
if (session->new_streams_needed) {
+ BT_COMP_LOGD("Need an update for streams: "
+ "session-id=%" PRIu64, session->id);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
struct lttng_live_trace *trace =
g_ptr_array_index(session->traces, trace_idx);
if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
+ BT_COMP_LOGD("Need an update for metadata stream: "
+ "session-id=%" PRIu64 ", trace-id=%" PRIu64,
+ session->id, trace->id);
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
LTTNG_LIVE_ITERATOR_STATUS_OK;
bt_logging_level log_level = lttng_live_msg_iter->log_level;
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
+
+ BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", "
+ "viewer-stream-id=%" PRIu64, stream_iter->name->str,
+ stream_iter->viewer_stream_id);
+
/*
* The viewer has hung up on us so we are closing the stream. The
* `ctf_msg_iter` should simply realize that it needs to close the
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
enum lttng_live_iterator_status live_status;
- BT_COMP_LOGD("Finding the next message for stream iterator: "
- "stream-name=\"%s\"", stream_iter->name->str);
+ BT_COMP_LOGD("Advancing live stream iterator until next message if possible: "
+ "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
+ stream_iter->name->str, stream_iter->viewer_stream_id);
if (stream_iter->has_stream_hung_up) {
/*
retry:
LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
+
+ /*
+ * Make sure we have the most recent metadata and possibly some new
+ * streams.
+ */
live_status = lttng_live_iterator_handle_new_streams_and_metadata(
lttng_live_msg_iter);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
+
live_status = lttng_live_iterator_next_handle_one_no_data_stream(
lttng_live_msg_iter, stream_iter);
-
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
/*
}
goto end;
}
+
live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
lttng_live_msg_iter, stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
end:
if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
+ BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams");
goto retry;
}
+ BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s"
+ "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
+ lttng_live_iterator_status_string(live_status),
+ stream_iter->name->str, stream_iter->viewer_stream_id);
+
return live_status;
}
+static
+bool is_discarded_packet_or_event_message(const bt_message *msg)
+{
+ const enum bt_message_type msg_type = bt_message_get_type(msg);
+
+ return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
+ msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
+}
+
+static
+enum lttng_live_iterator_status adjust_discarded_packets_message(
+ bt_self_message_iterator *iter,
+ const bt_stream *stream,
+ const bt_message *msg_in, bt_message **msg_out,
+ uint64_t new_begin_ts)
+{
+ enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_property_availability availability;
+ const bt_clock_snapshot *clock_snapshot;
+ uint64_t end_ts;
+ uint64_t count;
+
+ clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
+ end_ts = bt_clock_snapshot_get_value(clock_snapshot);
+
+ availability = bt_message_discarded_packets_get_count(msg_in, &count);
+ BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+
+ *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
+ iter, stream, new_begin_ts, end_ts);
+ if (!*msg_out) {
+ status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ bt_message_discarded_packets_set_count(*msg_out, count);
+end:
+ return status;
+}
+
+static
+enum lttng_live_iterator_status adjust_discarded_events_message(
+ bt_self_message_iterator *iter,
+ const bt_stream *stream,
+ const bt_message *msg_in, bt_message **msg_out,
+ uint64_t new_begin_ts)
+{
+ enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_property_availability availability;
+ const bt_clock_snapshot *clock_snapshot;
+ uint64_t end_ts;
+ uint64_t count;
+
+ clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
+ end_ts = bt_clock_snapshot_get_value(clock_snapshot);
+
+ availability = bt_message_discarded_events_get_count(msg_in, &count);
+ BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+
+ *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
+ iter, stream, new_begin_ts, end_ts);
+ if (!*msg_out) {
+ status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ bt_message_discarded_events_set_count(*msg_out, count);
+end:
+ return status;
+}
+
static
enum lttng_live_iterator_status next_stream_iterator_for_trace(
struct lttng_live_msg_iter *lttng_live_msg_iter,
stream_iter_idx);
/*
- * Find if there is are now current message for this stream
- * iterator get it.
+ * If there is no current message for this stream, go fetch
+ * one.
*/
while (!stream_iter->current_msg) {
const bt_message *msg = NULL;
int64_t curr_msg_ts_ns = INT64_MAX;
+
stream_iter_status = lttng_live_iterator_next_msg_on_stream(
lttng_live_msg_iter, stream_iter, &msg);
- BT_COMP_LOGD("live stream iterator returned status :%s",
- lttng_live_iterator_status_string(stream_iter_status));
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
stream_iter_is_ended = true;
break;
BT_ASSERT_DBG(msg);
+ BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s"
+ "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
+ bt_common_message_type_string(bt_message_get_type(msg)),
+ stream_iter->name->str, stream_iter->viewer_stream_id);
+
/*
* Get the timestamp in nanoseconds from origin of this
* messsage.
stream_iter->current_msg = msg;
stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
} else {
- /*
- * We received a message in the past. To ensure
- * monotonicity, we can't send it forward.
- */
- BT_COMP_LOGE_APPEND_CAUSE(self_comp,
- "Message's timestamp is less than "
- "lttng-live's message iterator's last "
- "returned timestamp: "
- "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
- "last-msg-ts=%" PRId64,
- lttng_live_msg_iter, curr_msg_ts_ns,
- lttng_live_msg_iter->last_msg_ts_ns);
- stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
+ const bt_clock_class *clock_class;
+ const bt_stream_class *stream_class;
+ enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
+ int64_t last_inactivity_ts_ns;
+
+ stream_class = bt_stream_borrow_class_const(stream_iter->stream);
+ clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
+
+ ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
+ clock_class,
+ stream_iter->last_inactivity_ts,
+ &last_inactivity_ts_ns);
+ if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ if (last_inactivity_ts_ns > curr_msg_ts_ns &&
+ is_discarded_packet_or_event_message(msg)) {
+ /*
+ * The CTF message iterator emits Discarded
+ * Packets and Events with synthesized begin and
+ * end timestamps from the bounds of the last
+ * known packet and the newly decoded packet
+ * header.
+ *
+ * The CTF message iterator is not aware of
+ * stream inactivity beacons. Hence, we have
+ * to adjust the begin timestamp of those types
+ * of messages if a stream signaled its
+ * inactivity up until _after_ the last known
+ * packet's begin timestamp.
+ *
+ * Otherwise, the monotonicity guarantee would
+ * not be preserved.
+ */
+ const enum bt_message_type msg_type =
+ bt_message_get_type(msg);
+ enum lttng_live_iterator_status adjust_status =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
+ bt_message *adjusted_message;
+
+ switch (msg_type) {
+ case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ adjust_status = adjust_discarded_events_message(
+ lttng_live_msg_iter->self_msg_iter,
+ stream_iter->stream,
+ msg, &adjusted_message,
+ stream_iter->last_inactivity_ts);
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ adjust_status = adjust_discarded_packets_message(
+ lttng_live_msg_iter->self_msg_iter,
+ stream_iter->stream,
+ msg, &adjusted_message,
+ stream_iter->last_inactivity_ts);
+ break;
+ default:
+ bt_common_abort();
+ }
+
+ if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ stream_iter_status = adjust_status;
+ goto end;
+ }
+
+ BT_ASSERT_DBG(adjusted_message);
+ stream_iter->current_msg = adjusted_message;
+ stream_iter->current_msg_ts_ns =
+ last_inactivity_ts_ns;
+ } else {
+ /*
+ * We received a message in the past. To ensure
+ * monotonicity, we can't send it forward.
+ */
+ BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+ "Message's timestamp is less than "
+ "lttng-live's message iterator's last "
+ "returned timestamp: "
+ "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
+ "last-msg-ts=%" PRId64,
+ lttng_live_msg_iter, curr_msg_ts_ns,
+ lttng_live_msg_iter->last_msg_ts_ns);
+ stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
}
}