+ struct lttng_live_session *session;
+
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ struct lttng_live_trace *trace;
+
+ session->new_streams_needed = true;
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ trace->new_metadata_needed = true;
+ }
+ }
+}
+
+static
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+ struct lttng_live_component *lttng_live)
+{
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ unsigned int nr_sessions_opened = 0;
+ struct lttng_live_session *session, *s;
+
+ bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) {
+ if (session->closed && bt_list_empty(&session->traces)) {
+ lttng_live_destroy_session(session);
+ }
+ }
+ /*
+ * Currently, when there are no sessions, we quit immediately.
+ * We may want to add a component parameter to keep trying until
+ * we get data in the future.
+ * Also, in a remotely distant future, we could add a "new
+ * session" flag to the protocol, which would tell us that we
+ * need to query for new sessions even though we have sessions
+ * currently ongoing.
+ */
+ if (bt_list_empty(<tng_live->sessions)) {
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ goto end;
+ }
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ ret = lttng_live_get_session(lttng_live, session);
+ switch (ret) {
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ default:
+ goto end;
+ }
+ if (!session->closed) {
+ nr_sessions_opened++;
+ }
+ }
+end:
+ if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+ return ret;
+}
+
+static
+enum bt_lttng_live_iterator_status emit_inactivity_notification(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification,
+ uint64_t timestamp)
+{
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct lttng_live_trace *trace;
+ struct bt_clock_class *clock_class = NULL;
+ struct bt_clock_value *clock_value = NULL;
+ struct bt_notification *notif = NULL;
+ int retval;
+
+ trace = lttng_live_stream->trace;
+ if (!trace) {
+ goto error;
+ }
+ clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
+ if (!clock_class) {
+ goto error;
+ }
+ clock_value = bt_clock_value_create(clock_class, timestamp);
+ if (!clock_value) {
+ goto error;
+ }
+ notif = bt_notification_inactivity_create(trace->cc_prio_map);
+ if (!notif) {
+ goto error;
+ }
+ retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
+ if (retval) {
+ goto error;
+ }
+ *notification = notif;
+end:
+ bt_put(clock_value);
+ bt_put(clock_class);
+ return ret;
+
+error:
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ bt_put(notif);
+ goto end;
+}
+
+static
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification)
+{
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct bt_clock_class *clock_class = NULL;
+ struct bt_clock_value *clock_value = NULL;
+
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ }
+
+ if (lttng_live_stream->current_inactivity_timestamp ==
+ lttng_live_stream->last_returned_inactivity_timestamp) {
+ lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
+ }
+
+ ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
+ (uint64_t) lttng_live_stream->current_inactivity_timestamp);
+
+ lttng_live_stream->last_returned_inactivity_timestamp =
+ lttng_live_stream->current_inactivity_timestamp;
+end:
+ bt_put(clock_value);
+ bt_put(clock_class);
+ return ret;
+}
+
+static
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification)
+{
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_notif_iter_status status;
+ struct lttng_live_session *session;
+
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ struct lttng_live_trace *trace;
+
+ if (session->new_streams_needed) {
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ if (trace->new_metadata_needed) {
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ }
+ }
+
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
+ if (lttng_live_stream->packet_end_notif_queue) {
+ *notification = lttng_live_stream->packet_end_notif_queue;
+ lttng_live_stream->packet_end_notif_queue = NULL;
+ status = BT_NOTIF_ITER_STATUS_OK;
+ } else {
+ status = bt_notif_iter_get_next_notification(
+ lttng_live_stream->notif_iter,
+ lttng_live_stream->trace->cc_prio_map,
+ notification);
+ if (status == BT_NOTIF_ITER_STATUS_OK) {
+ /*
+ * Consider empty packets as inactivity.
+ */
+ if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
+ lttng_live_stream->packet_end_notif_queue = *notification;
+ *notification = NULL;
+ return emit_inactivity_notification(lttng_live,
+ lttng_live_stream, notification,
+ lttng_live_stream->current_packet_end_timestamp);
+ }
+ }
+ }
+ switch (status) {
+ case BT_NOTIF_ITER_STATUS_EOF:
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ break;
+ case BT_NOTIF_ITER_STATUS_OK:
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ case BT_NOTIF_ITER_STATUS_AGAIN:
+ /*
+ * Continue immediately (end of packet). The next
+ * get_index may return AGAIN to delay the following
+ * attempt.
+ */
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ break;
+ case BT_NOTIF_ITER_STATUS_INVAL:
+ /* No argument provided by the user, so don't return INVAL. */
+ case BT_NOTIF_ITER_STATUS_ERROR:
+ default:
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return ret;
+}
+
+/*
+ * helper function:
+ * handle_no_data_streams()
+ * retry:
+ * - for each ACTIVE_NO_DATA stream:
+ * - query relayd for stream data, or quiescence info.
+ * - if need metadata, get metadata, goto retry.
+ * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
+ * - if quiescent, move to QUIESCENT streams
+ * - if fetched data, move to ACTIVE_DATA streams
+ * (at this point each stream either has data, or is quiescent)
+ *
+ *
+ * iterator_next:
+ * handle_new_streams_and_metadata()
+ * - query relayd for known streams, add them as ACTIVE_NO_DATA
+ * - query relayd for metadata
+ *
+ * call handle_active_no_data_streams()
+ *
+ * handle_quiescent_streams()
+ * - if at least one stream is ACTIVE_DATA:
+ * - peek stream event with lowest timestamp -> next_ts
+ * - for each quiescent stream
+ * - if next_ts >= quiescent end
+ * - set state to ACTIVE_NO_DATA
+ * - else
+ * - for each quiescent stream
+ * - set state to ACTIVE_NO_DATA
+ *
+ * call handle_active_no_data_streams()
+ *
+ * handle_active_data_streams()
+ * - if at least one stream is ACTIVE_DATA:
+ * - get stream event with lowest timestamp from heap
+ * - make that stream event the current notification.
+ * - move this stream heap position to its next event
+ * - if we need to fetch data from relayd, move
+ * stream to ACTIVE_NO_DATA.
+ * - return OK
+ * - return AGAIN
+ *
+ * end criterion: ctrl-c on client. If relayd exits or the session
+ * closes on the relay daemon side, we keep on waiting for streams.
+ * Eventually handle --end timestamp (also an end criterion).
+ *
+ * When disconnected from relayd: try to re-connect endlessly.
+ */
+static
+struct bt_notification_iterator_next_method_return lttng_live_iterator_next_stream(
+ struct bt_private_connection_private_notification_iterator *iterator,
+ struct lttng_live_stream_iterator *stream_iter)
+{
+ enum bt_lttng_live_iterator_status status;
+ struct bt_notification_iterator_next_method_return next_return;
+ struct lttng_live_component *lttng_live;
+
+ lttng_live = stream_iter->trace->session->lttng_live;
+retry:
+ print_stream_state(stream_iter);
+ next_return.notification = NULL;
+ status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_no_data_stream(
+ lttng_live, stream_iter);
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_quiescent_stream(
+ lttng_live, stream_iter, &next_return.notification);
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ BT_ASSERT(next_return.notification == NULL);
+ goto end;
+ }
+ if (next_return.notification) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
+ stream_iter, &next_return.notification);
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ BT_ASSERT(next_return.notification == NULL);
+ }
+
+end:
+ switch (status) {
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ print_dbg("continue");
+ goto retry;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ print_dbg("again");
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ print_dbg("end");
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ print_dbg("ok");
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ default: /* fall-through */
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+
+static
+struct bt_notification_iterator_next_method_return lttng_live_iterator_next_no_stream(
+ struct bt_private_connection_private_notification_iterator *iterator,
+ struct lttng_live_no_stream_iterator *no_stream_iter)
+{
+ enum bt_lttng_live_iterator_status status;
+ struct bt_notification_iterator_next_method_return next_return;
+ struct lttng_live_component *lttng_live;
+
+ lttng_live = no_stream_iter->lttng_live;
+retry:
+ lttng_live_force_new_streams_and_metadata(lttng_live);
+ next_return.notification = NULL;
+ status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ if (no_stream_iter->port) {
+ status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+end:
+ switch (status) {
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ goto retry;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+ break;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ default: /* fall-through */
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+
+BT_HIDDEN
+struct bt_notification_iterator_next_method_return lttng_live_iterator_next(
+ struct bt_private_connection_private_notification_iterator *iterator)
+{
+ struct lttng_live_stream_iterator_generic *s =
+ bt_private_connection_private_notification_iterator_get_user_data(iterator);
+ struct bt_notification_iterator_next_method_return next_return;
+
+ switch (s->type) {
+ case LIVE_STREAM_TYPE_NO_STREAM:
+ next_return = lttng_live_iterator_next_no_stream(iterator,
+ container_of(s, struct lttng_live_no_stream_iterator, p));
+ break;
+ case LIVE_STREAM_TYPE_STREAM:
+ next_return = lttng_live_iterator_next_stream(iterator,
+ container_of(s, struct lttng_live_stream_iterator, p));
+ break;
+ default:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+
+BT_HIDDEN
+enum bt_notification_iterator_status lttng_live_iterator_init(
+ struct bt_private_connection_private_notification_iterator *it,
+ struct bt_private_port *port)
+{
+ enum bt_notification_iterator_status ret =
+ BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ struct lttng_live_stream_iterator_generic *s;
+
+ BT_ASSERT(it);
+
+ s = bt_private_port_get_user_data(port);
+ BT_ASSERT(s);
+ switch (s->type) {
+ case LIVE_STREAM_TYPE_NO_STREAM:
+ {
+ struct lttng_live_no_stream_iterator *no_stream_iter =
+ container_of(s, struct lttng_live_no_stream_iterator, p);
+ ret = bt_private_connection_private_notification_iterator_set_user_data(it, no_stream_iter);
+ if (ret) {
+ goto error;
+ }
+ break;
+ }
+ case LIVE_STREAM_TYPE_STREAM:
+ {
+ struct lttng_live_stream_iterator *stream_iter =
+ container_of(s, struct lttng_live_stream_iterator, p);
+ ret = bt_private_connection_private_notification_iterator_set_user_data(it, stream_iter);
+ if (ret) {
+ goto error;
+ }
+ break;
+ }
+ default:
+ ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+end:
+ return ret;
+error:
+ if (bt_private_connection_private_notification_iterator_set_user_data(it, NULL)
+ != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+ BT_LOGE("Error setting private data to NULL");
+ }
+ goto end;
+}
+
+static
+struct bt_component_class_query_method_return lttng_live_query_list_sessions(
+ struct bt_component_class *comp_class,
+ struct bt_query_executor *query_exec,
+ struct bt_value *params)
+{
+ struct bt_component_class_query_method_return query_ret = {
+ .result = NULL,
+ .status = BT_QUERY_STATUS_OK,