- struct lttng_viewer_cmd cmd;
- struct lttng_viewer_get_next_index rq;
- enum lttng_live_viewer_status viewer_status;
- struct lttng_viewer_index rp;
- enum lttng_live_iterator_status status;
- struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
- bt_self_component *self_comp = viewer_connection->self_comp;
- struct lttng_live_trace *trace = stream->trace;
- const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
- char cmd_buf[cmd_buf_len];
- uint32_t flags, rp_status;
-
- BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
- "viewer-stream-id=%" PRIu64,
- lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
- stream->viewer_stream_id);
- cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
- cmd.data_size = htobe64((uint64_t) sizeof(rq));
- cmd.cmd_version = htobe32(0);
-
- memset(&rq, 0, sizeof(rq));
- rq.stream_id = htobe64(stream->viewer_stream_id);
-
- /*
- * Merge the cmd and connection request to prevent a write-write
- * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
- * second write to be performed quickly in presence of Nagle's algorithm.
- */
- memcpy(cmd_buf, &cmd, sizeof(cmd));
- memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
-
- viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
- if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
- viewer_handle_send_status(self_comp, NULL,
- viewer_status, "get next index command");
- goto error;
- }
-
- viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
- if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
- viewer_handle_recv_status(self_comp, NULL,
- viewer_status, "get next index reply");
- goto error;
- }
-
- flags = be32toh(rp.flags);
- rp_status = be32toh(rp.status);
-
- BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
- lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
- lttng_viewer_next_index_return_code_string(rp_status));
- switch (rp_status) {
- case LTTNG_VIEWER_INDEX_INACTIVE:
- {
- uint64_t ctf_stream_class_id;
-
- memset(index, 0, sizeof(struct packet_index));
- index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
- stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
- ctf_stream_class_id = be64toh(rp.stream_id);
- if (stream->ctf_stream_class_id.is_set) {
- BT_ASSERT(stream->ctf_stream_class_id.value==
- ctf_stream_class_id);
- } else {
- stream->ctf_stream_class_id.value = ctf_stream_class_id;
- stream->ctf_stream_class_id.is_set = true;
- }
- lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
- status = LTTNG_LIVE_ITERATOR_STATUS_OK;
- break;
- }
- case LTTNG_VIEWER_INDEX_OK:
- {
- uint64_t ctf_stream_class_id;
-
- lttng_index_to_packet_index(&rp, index);
- ctf_stream_class_id = be64toh(rp.stream_id);
- if (stream->ctf_stream_class_id.is_set) {
- BT_ASSERT(stream->ctf_stream_class_id.value==
- ctf_stream_class_id);
- } else {
- stream->ctf_stream_class_id.value = ctf_stream_class_id;
- stream->ctf_stream_class_id.is_set = true;
- }
-
- lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
-
- if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
- BT_COMP_LOGD("Marking trace as needing new metadata: "
- "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
- lttng_viewer_next_index_return_code_string(rp_status),
- trace->id);
- trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
- }
- if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
- BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
- "response=%s, response-flag=NEW_STREAM",
- lttng_viewer_next_index_return_code_string(rp_status));
- lttng_live_need_new_streams(lttng_live_msg_iter);
- }
- status = LTTNG_LIVE_ITERATOR_STATUS_OK;
- break;
- }
- case LTTNG_VIEWER_INDEX_RETRY:
- memset(index, 0, sizeof(struct packet_index));
- lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
- status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
- goto end;
- case LTTNG_VIEWER_INDEX_HUP:
- memset(index, 0, sizeof(struct packet_index));
- index->offset = EOF;
- lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
- stream->has_stream_hung_up = true;
- status = LTTNG_LIVE_ITERATOR_STATUS_END;
- break;
- case LTTNG_VIEWER_INDEX_ERR:
- memset(index, 0, sizeof(struct packet_index));
- lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
- status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
- default:
- BT_COMP_LOGD("Received get_next_index response: unknown value");
- memset(index, 0, sizeof(struct packet_index));
- lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
- status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- goto end;
- }
- goto end;
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_get_next_index rq;
+ enum lttng_live_viewer_status viewer_status;
+ struct lttng_viewer_index rp;
+ enum lttng_live_iterator_status status;
+ struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
+ bt_self_component *self_comp = viewer_connection->self_comp;
+ struct lttng_live_trace *trace = stream->trace;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
+ uint32_t flags, rp_status;
+
+ BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
+ "viewer-stream-id=%" PRIu64,
+ lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
+ stream->viewer_stream_id);
+ cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
+
+ memset(&rq, 0, sizeof(rq));
+ rq.stream_id = htobe64(stream->viewer_stream_id);
+
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+
+ viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+ if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+ viewer_handle_send_status(self_comp, NULL, viewer_status, "get next index command");
+ goto error;
+ }
+
+ viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
+ if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+ viewer_handle_recv_status(self_comp, NULL, viewer_status, "get next index reply");
+ goto error;
+ }
+
+ flags = be32toh(rp.flags);
+ rp_status = be32toh(rp.status);
+
+ BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
+ lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
+ lttng_viewer_next_index_return_code_string(rp_status));
+ switch (rp_status) {
+ case LTTNG_VIEWER_INDEX_INACTIVE:
+ {
+ uint64_t ctf_stream_class_id;
+
+ memset(index, 0, sizeof(struct packet_index));
+ index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
+ stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
+ ctf_stream_class_id = be64toh(rp.stream_id);
+ if (stream->ctf_stream_class_id.is_set) {
+ BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
+ } else {
+ stream->ctf_stream_class_id.value = ctf_stream_class_id;
+ stream->ctf_stream_class_id.is_set = true;
+ }
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
+ status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ }
+ case LTTNG_VIEWER_INDEX_OK:
+ {
+ uint64_t ctf_stream_class_id;
+
+ lttng_index_to_packet_index(&rp, index);
+ ctf_stream_class_id = be64toh(rp.stream_id);
+ if (stream->ctf_stream_class_id.is_set) {
+ BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
+ } else {
+ stream->ctf_stream_class_id.value = ctf_stream_class_id;
+ stream->ctf_stream_class_id.is_set = true;
+ }
+
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
+
+ if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
+ BT_COMP_LOGD("Marking trace as needing new metadata: "
+ "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
+ lttng_viewer_next_index_return_code_string(rp_status), trace->id);
+ trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
+ }
+ if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+ BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
+ "response=%s, response-flag=NEW_STREAM",
+ lttng_viewer_next_index_return_code_string(rp_status));
+ lttng_live_need_new_streams(lttng_live_msg_iter);
+ }
+ status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ }
+ case LTTNG_VIEWER_INDEX_RETRY:
+ memset(index, 0, sizeof(struct packet_index));
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
+ status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ goto end;
+ case LTTNG_VIEWER_INDEX_HUP:
+ memset(index, 0, sizeof(struct packet_index));
+ index->offset = EOF;
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
+ stream->has_stream_hung_up = true;
+ status = LTTNG_LIVE_ITERATOR_STATUS_END;
+ break;
+ case LTTNG_VIEWER_INDEX_ERR:
+ memset(index, 0, sizeof(struct packet_index));
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
+ status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ goto end;
+ default:
+ BT_COMP_LOGD("Received get_next_index response: unknown value");
+ memset(index, 0, sizeof(struct packet_index));
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
+ status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+ goto end;