/*
- * Copyright 2019 - Francis Deslauriers <francis.deslauriers@efficios.com>
- * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * SPDX-License-Identifier: MIT
*
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
+ * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
+ * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*/
#define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
_msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__); \
} while (0)
+static
+const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
+{
+ switch (cmd){
+ case LTTNG_VIEWER_CONNECT:
+ return "CONNECT";
+ case LTTNG_VIEWER_LIST_SESSIONS:
+ return "LIST_SESSIONS";
+ case LTTNG_VIEWER_ATTACH_SESSION:
+ return "ATTACH_SESSION";
+ case LTTNG_VIEWER_GET_NEXT_INDEX:
+ return "GET_NEXT_INDEX";
+ case LTTNG_VIEWER_GET_PACKET:
+ return "GET_PACKET";
+ case LTTNG_VIEWER_GET_METADATA:
+ return "GET_METADATA";
+ case LTTNG_VIEWER_GET_NEW_STREAMS:
+ return "GET_NEW_STREAMS";
+ case LTTNG_VIEWER_CREATE_SESSION:
+ return "CREATE_SESSION";
+ case LTTNG_VIEWER_DETACH_SESSION:
+ return "DETACH_SESSION";
+ }
+
+ bt_common_abort();
+}
+
+static
+const char *lttng_viewer_next_index_return_code_string(
+ enum lttng_viewer_next_index_return_code code)
+{
+ switch (code) {
+ case LTTNG_VIEWER_INDEX_OK:
+ return "INDEX_OK";
+ case LTTNG_VIEWER_INDEX_RETRY:
+ return "INDEX_RETRY";
+ case LTTNG_VIEWER_INDEX_HUP:
+ return "INDEX_HUP";
+ case LTTNG_VIEWER_INDEX_ERR:
+ return "INDEX_ERR";
+ case LTTNG_VIEWER_INDEX_INACTIVE:
+ return "INDEX_INACTIVE";
+ case LTTNG_VIEWER_INDEX_EOF:
+ return "INDEX_EOF";
+ }
+
+ bt_common_abort();
+}
+
+static
+const char *lttng_viewer_get_packet_return_code_string(
+ enum lttng_viewer_get_packet_return_code code)
+{
+ switch (code) {
+ case LTTNG_VIEWER_GET_PACKET_OK:
+ return "GET_PACKET_OK";
+ case LTTNG_VIEWER_GET_PACKET_RETRY:
+ return "GET_PACKET_RETRY";
+ case LTTNG_VIEWER_GET_PACKET_ERR:
+ return "GET_PACKET_ERR";
+ case LTTNG_VIEWER_GET_PACKET_EOF:
+ return "GET_PACKET_EOF";
+ }
+
+ bt_common_abort();
+};
+
+static
+const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
+{
+ switch (seek) {
+ case LTTNG_VIEWER_SEEK_BEGINNING:
+ return "SEEK_BEGINNING";
+ case LTTNG_VIEWER_SEEK_LAST:
+ return "SEEK_LAST";
+ }
+
+ bt_common_abort();
+}
+
static inline
enum lttng_live_iterator_status viewer_status_to_live_iterator_status(
enum lttng_live_viewer_status viewer_status)
return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
case LTTNG_LIVE_VIEWER_STATUS_ERROR:
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- default:
- bt_common_abort();
}
+
+ bt_common_abort();
}
static inline
return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
case LTTNG_LIVE_VIEWER_STATUS_ERROR:
return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
- default:
- bt_common_abort();
}
+
+ bt_common_abort();
+}
+
+static inline
+void viewer_connection_close_socket(
+ struct live_viewer_connection *viewer_connection)
+{
+ bt_self_component_class *self_comp_class =
+ viewer_connection->self_comp_class;
+ bt_self_component *self_comp =
+ viewer_connection->self_comp;
+ int ret = bt_socket_close(viewer_connection->control_sock);
+ if (ret == -1) {
+ BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(
+ self_comp, self_comp_class,
+ "Error closing viewer connection socket: ", ".");
+ }
+
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
}
/*
}
} else {
/*
- * For any other types of socket error, returng
- * an error.
+ * For any other types of socket error, close
+ * the socket and return an error.
*/
LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
self_comp, self_comp_class,
"Error receiving from Relay", ".");
+
+ viewer_connection_close_socket(viewer_connection);
status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
goto end;
}
* connection was orderly shutdown from the other peer.
* If that happens when we are trying to receive
* a message from it, it means something when wrong.
+ * Close the socket and return an error.
*/
- status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
self_comp_class, "Remote side has closed connection");
+ viewer_connection_close_socket(viewer_connection);
+ status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
goto end;
}
}
} else {
/*
- * The send() call returned an error.
+ * For any other types of socket error, close
+ * the socket and return an error.
*/
LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
self_comp, self_comp_class,
"Error sending to Relay", ".");
+
+ viewer_connection_close_socket(viewer_connection);
status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
goto end;
}
char cmd_buf[cmd_buf_len];
BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
- "Handshaking with the Relay: "
- "major-version=%u, minor-version=%u",
- LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
+ "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
+ lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR,
+ LTTNG_LIVE_MINOR);
cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
cmd.data_size = htobe64((uint64_t) sizeof(connect));
static
enum lttng_live_viewer_status lttng_live_connect_viewer(
- struct live_viewer_connection *viewer_connection)
+ struct live_viewer_connection *viewer_connection)
{
struct hostent *host;
struct sockaddr_in server_addr;
goto error;
}
+ BT_LOGD("Requesting list of sessions: cmd=%s",
+ lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
+
cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
bt_self_component_class *self_comp_class =
viewer_connection->self_comp_class;
- BT_COMP_LOGD("Asking the Relay for the list of sessions");
+ BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
+ lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
cmd.data_size = htobe64((uint64_t) 0);
viewer_connection->self_comp_class;
BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
- "Creating a viewer session");
+ "Creating a viewer session: cmd=%s",
+ lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
cmd.data_size = htobe64((uint64_t) 0);
lttng_live_msg_iter->viewer_connection;
bt_self_component *self_comp = viewer_connection->self_comp;
- BT_COMP_LOGI("Getting %" PRIu32 " new streams:", stream_count);
+ BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
for (i = 0; i < stream_count; i++) {
struct lttng_viewer_stream stream;
struct lttng_live_stream_iterator *live_stream;
}
BT_HIDDEN
-enum lttng_live_viewer_status lttng_live_attach_session(
+enum lttng_live_viewer_status lttng_live_session_attach(
struct lttng_live_session *session,
bt_self_message_iterator *self_msg_iter)
{
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- BT_COMP_LOGD("Attaching to session: session-id=%"PRIu64, session_id);
+ BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64
+ ", seek=%s",
+ lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION),
+ session_id,
+ lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
}
BT_HIDDEN
-enum lttng_live_viewer_status lttng_live_detach_session(
+enum lttng_live_viewer_status lttng_live_session_detach(
struct lttng_live_session *session)
{
struct lttng_viewer_cmd cmd;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- if (!session->attached) {
+ /*
+ * The session might already be detached and the viewer socket might
+ * already been closed. This happens when calling this function when
+ * tearing down the graph after an error.
+ */
+ if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
return 0;
}
+ BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
+ lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION),
+ session_id);
+
cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_metadata rq;
struct lttng_viewer_metadata_packet rp;
- char *data = NULL;
- ssize_t ret_len;
+ gchar *data = NULL;
+ ssize_t writelen;
struct lttng_live_session *session = trace->session;
struct lttng_live_msg_iter *lttng_live_msg_iter =
session->lttng_live_msg_iter;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- BT_COMP_LOGD("Requesting new metadata for trace: "
- "trace-id=%"PRIu64", metadata-stream-id=%"PRIu64,
+ BT_COMP_LOGD("Requesting new metadata for trace:"
+ "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
+ lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA),
trace->id, metadata->stream_id);
rq.stream_id = htobe64(metadata->stream_id);
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
"Received get_metadata response: unknown");
status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
- goto error;
+ goto end;
}
len = be64toh(rp.len);
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
"Erroneous response length");
status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
- goto error;
+ goto end;
}
- data = calloc(1, len);
+ data = g_new0(gchar, len);
if (!data) {
BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
"Failed to allocate data buffer", ".");
status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
- goto error;
+ goto end;
}
viewer_status = lttng_live_recv(viewer_connection, data, len);
viewer_handle_recv_status(self_comp, NULL,
viewer_status, "get metadata packet");
status = (enum lttng_live_get_one_metadata_status) viewer_status;
- goto error;
+ goto end;
}
/*
* Write the metadata to the file handle.
*/
- do {
- ret_len = fwrite(data, 1, len, fp);
- } while (ret_len < 0 && errno == EINTR);
- if (ret_len < 0) {
+ writelen = fwrite(data, sizeof(uint8_t), len, fp);
+ if (writelen != len) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
"Writing in the metadata file stream");
status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
- goto error;
+ goto end;
}
- BT_ASSERT(ret_len == len);
+
*reply_len = len;
status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
-error:
- free(data);
-
end:
+ g_free(data);
return status;
}
pindex->events_discarded = be64toh(lindex->events_discarded);
}
+static
+void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
+{
+ uint64_t session_idx;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
+
+ for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
+ session_idx++) {
+ struct lttng_live_session *session =
+ g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
+ BT_COMP_LOGD("Marking session as needing new streams: "
+ "session-id=%" PRIu64, session->id);
+ session->new_streams_needed = true;
+ }
+}
+
BT_HIDDEN
enum lttng_live_iterator_status lttng_live_get_next_index(
struct lttng_live_msg_iter *lttng_live_msg_iter,
char cmd_buf[cmd_buf_len];
uint32_t flags, rp_status;
- BT_COMP_LOGD("Requesting next index for stream: "
- "stream-id=%"PRIu64, stream->viewer_stream_id);
-
+ BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
+ "viewer-stream-id=%" PRIu64,
+ lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
+ stream->viewer_stream_id);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
-
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->viewer_stream_id);
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+
viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_send_status(self_comp, NULL,
flags = be32toh(rp.flags);
rp_status = be32toh(rp.status);
+ BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
+ lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
+ lttng_viewer_next_index_return_code_string(rp_status));
switch (rp_status) {
case LTTNG_VIEWER_INDEX_INACTIVE:
{
uint64_t ctf_stream_class_id;
- BT_COMP_LOGD("Received get_next_index response: inactive");
memset(index, 0, sizeof(struct packet_index));
index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
ctf_stream_class_id = be64toh(rp.stream_id);
- if (stream->ctf_stream_class_id != -1ULL) {
- BT_ASSERT(stream->ctf_stream_class_id ==
+ if (stream->ctf_stream_class_id.is_set) {
+ BT_ASSERT(stream->ctf_stream_class_id.value==
ctf_stream_class_id);
} else {
- stream->ctf_stream_class_id = ctf_stream_class_id;
+ stream->ctf_stream_class_id.value = ctf_stream_class_id;
+ stream->ctf_stream_class_id.is_set = true;
}
- stream->state = LTTNG_LIVE_STREAM_QUIESCENT;
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
status = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
}
{
uint64_t ctf_stream_class_id;
- BT_COMP_LOGD("Received get_next_index response: OK");
lttng_index_to_packet_index(&rp, index);
ctf_stream_class_id = be64toh(rp.stream_id);
- if (stream->ctf_stream_class_id != -1ULL) {
- BT_ASSERT(stream->ctf_stream_class_id ==
+ if (stream->ctf_stream_class_id.is_set) {
+ BT_ASSERT(stream->ctf_stream_class_id.value==
ctf_stream_class_id);
} else {
- stream->ctf_stream_class_id = ctf_stream_class_id;
+ stream->ctf_stream_class_id.value = ctf_stream_class_id;
+ stream->ctf_stream_class_id.is_set = true;
}
- stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
- BT_COMP_LOGD("Received get_next_index response: new metadata needed");
- trace->new_metadata_needed = true;
+ BT_COMP_LOGD("Marking trace as needing new metadata: "
+ "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
+ lttng_viewer_next_index_return_code_string(rp_status),
+ trace->id);
+ trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
}
if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
- BT_COMP_LOGD("Received get_next_index response: new streams needed");
+ BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
+ "response=%s, response-flag=NEW_STREAM",
+ lttng_viewer_next_index_return_code_string(rp_status));
lttng_live_need_new_streams(lttng_live_msg_iter);
}
status = LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
}
case LTTNG_VIEWER_INDEX_RETRY:
- BT_COMP_LOGD("Received get_next_index response: retry");
memset(index, 0, sizeof(struct packet_index));
- stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
goto end;
case LTTNG_VIEWER_INDEX_HUP:
- BT_COMP_LOGD("Received get_next_index response: stream hung up");
memset(index, 0, sizeof(struct packet_index));
index->offset = EOF;
- stream->state = LTTNG_LIVE_STREAM_EOF;
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
stream->has_stream_hung_up = true;
status = LTTNG_LIVE_ITERATOR_STATUS_END;
break;
case LTTNG_VIEWER_INDEX_ERR:
- BT_COMP_LOGD("Received get_next_index response: error");
memset(index, 0, sizeof(struct packet_index));
- stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
default:
BT_COMP_LOGD("Received get_next_index response: unknown value");
memset(index, 0, sizeof(struct packet_index));
- stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+ lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
}
char cmd_buf[cmd_buf_len];
uint32_t flags, rp_status;
- BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
+ BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
+ "offset=%" PRIu64 ", request-len=%" PRIu64,
+ lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
offset, req_len);
+
cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+
viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_send_status(self_comp, NULL,
viewer_status, "get data packet command");
- goto error;
+ goto error_convert_status;
}
viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(self_comp, NULL,
viewer_status, "get data packet reply");
- goto error;
+ goto error_convert_status;
}
flags = be32toh(rp.flags);
rp_status = be32toh(rp.status);
+ BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
+ lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
+ lttng_viewer_get_packet_return_code_string(rp_status));
switch (rp_status) {
case LTTNG_VIEWER_GET_PACKET_OK:
req_len = be32toh(rp.len);
- BT_COMP_LOGD("Received get_data_packet response: Ok, "
- "packet size : %" PRIu64 "", req_len);
+ BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
+ lttng_viewer_get_packet_return_code_string(rp_status),
+ req_len);
status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
break;
case LTTNG_VIEWER_GET_PACKET_RETRY:
/* Unimplemented by relay daemon */
- BT_COMP_LOGD("Received get_data_packet response: retry");
status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
goto end;
case LTTNG_VIEWER_GET_PACKET_ERR:
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
- BT_COMP_LOGD("get_data_packet: new metadata needed, try again later");
- trace->new_metadata_needed = true;
+ BT_COMP_LOGD("Marking trace as needing new metadata: "
+ "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
+ lttng_viewer_next_index_return_code_string(rp_status),
+ trace->id);
+ trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
}
if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
- BT_COMP_LOGD("get_data_packet: new streams needed, try again later");
+ BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
+ "response=%s, response-flag=NEW_STREAM",
+ lttng_viewer_next_index_return_code_string(rp_status));
lttng_live_need_new_streams(lttng_live_msg_iter);
}
if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
| LTTNG_VIEWER_FLAG_NEW_STREAM)) {
status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+ BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
+ lttng_viewer_get_packet_return_code_string(rp_status));
goto end;
}
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
goto end;
default:
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
- "Received get_data_packet response: unknown");
+ "Received get_data_packet response: unknown (%d)", rp_status);
status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
- goto error;
+ goto end;
}
if (req_len == 0) {
status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
- goto error;
+ goto end;
}
viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(self_comp, NULL,
viewer_status, "get data packet");
- goto error;
+ goto error_convert_status;
}
*recv_len = req_len;
status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
goto end;
-error:
+error_convert_status:
status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
end:
return status;
* Request new streams for a session.
*/
BT_HIDDEN
-enum lttng_live_iterator_status lttng_live_get_new_streams(
+enum lttng_live_iterator_status lttng_live_session_get_new_streams(
struct lttng_live_session *session,
bt_self_message_iterator *self_msg_iter)
{
goto end;
}
- BT_COMP_LOGD("Requesting new streams for session: "
- "session-id=%"PRIu64, session->id);
+ BT_COMP_LOGD("Requesting new streams for session: cmd=%s"
+ "session-id=%" PRIu64,
+ lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS),
+ session->id);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+
viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_send_status(self_comp, NULL,