* SOFTWARE.
*/
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC-VIEWER"
+#include "logging.h"
+
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include <glib.h>
#include <inttypes.h>
-#include <sys/socket.h>
#include <sys/types.h>
-#include <netinet/in.h>
-#include <netdb.h>
#include <fcntl.h>
-#include <poll.h>
-#include <babeltrace/compat/send-internal.h>
+#include <babeltrace/compat/socket-internal.h>
+#include <babeltrace/endian-internal.h>
#include <babeltrace/compiler-internal.h>
+#include <babeltrace/common-internal.h>
+#include <babeltrace/graph/graph.h>
#include "lttng-live-internal.h"
#include "viewer-connection.h"
#include "data-stream.h"
#include "metadata.h"
-#define PRINT_ERR_STREAM viewer_connection->error_fp
-#define PRINT_PREFIX "lttng-live-viewer-connection"
-#define PRINT_DBG_CHECK lttng_live_debug
-#include "../print.h"
-
-static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
+static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
+ void *buf, size_t len)
{
ssize_t ret;
size_t copied = 0, to_copy = len;
+ struct lttng_live_component *lttng_live =
+ viewer_connection->lttng_live;
+ BT_SOCKET sock = viewer_connection->control_sock;
do {
- ret = recv(fd, buf + copied, to_copy, 0);
+ ret = bt_socket_recv(sock, buf + copied, to_copy, 0);
if (ret > 0) {
assert(ret <= to_copy);
copied += ret;
to_copy -= ret;
}
- } while ((ret > 0 && to_copy > 0)
- || (ret < 0 && errno == EINTR));
+ if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
+ if (lttng_live_is_canceled(lttng_live)) {
+ break;
+ } else {
+ continue;
+ }
+ }
+ } while (ret > 0 && to_copy > 0);
if (ret > 0)
ret = copied;
- /* ret = 0 means orderly shutdown, ret < 0 is error. */
+ /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */
return ret;
}
-static ssize_t lttng_live_send(int fd, const void *buf, size_t len)
+static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
+ const void *buf, size_t len)
{
+ struct lttng_live_component *lttng_live =
+ viewer_connection->lttng_live;
+ BT_SOCKET sock = viewer_connection->control_sock;
ssize_t ret;
- do {
- ret = bt_send_nosigpipe(fd, buf, len);
- } while (ret < 0 && errno == EINTR);
+ for (;;) {
+ ret = bt_socket_send_nosigpipe(sock, buf, len);
+ if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
+ if (lttng_live_is_canceled(lttng_live)) {
+ break;
+ } else {
+ continue;
+ }
+ } else {
+ break;
+ }
+ }
return ret;
}
-/*
- * hostname parameter needs to hold MAXNAMLEN chars.
- */
static int parse_url(struct bt_live_viewer_connection *viewer_connection)
{
- char remain[3][MAXNAMLEN];
- int ret = -1, proto, proto_offset = 0;
+ char error_buf[256] = { 0 };
+ struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
+ int ret = -1;
const char *path = viewer_connection->url->str;
- size_t path_len;
if (!path) {
goto end;
}
- path_len = strlen(path); /* not accounting \0 */
- /*
- * Since sscanf API does not allow easily checking string length
- * against a size defined by a macro. Test it beforehand on the
- * input. We know the output is always <= than the input length.
- */
- if (path_len >= MAXNAMLEN) {
- goto end;
- }
- ret = sscanf(path, "net%d://", &proto);
- if (ret < 1) {
- proto = 4;
- /* net:// */
- proto_offset = strlen("net://");
- } else {
- /* net4:// or net6:// */
- proto_offset = strlen("netX://");
- }
- if (proto_offset > path_len) {
+ lttng_live_url_parts = bt_common_parse_lttng_live_url(path,
+ error_buf, sizeof(error_buf));
+ if (!lttng_live_url_parts.proto) {
+ BT_LOGW("Invalid LTTng live URL format: %s", error_buf);
goto end;
}
- if (proto == 6) {
- PERR("[error] IPv6 is currently unsupported by lttng-live\n");
- goto end;
- }
- /* TODO : parse for IPv6 as well */
- /* Parse the hostname or IP */
- ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s",
- viewer_connection->relay_hostname, remain[0]);
- if (ret == 2) {
- /* Optional port number */
- switch (remain[0][0]) {
- case ':':
- ret = sscanf(remain[0], ":%d%s", &viewer_connection->port, remain[1]);
- /* Optional session ID with port number */
- if (ret == 2) {
- ret = sscanf(remain[1], "/%s", remain[2]);
- /* Accept 0 or 1 (optional) */
- if (ret < 0) {
- goto end;
- }
- } else if (ret == 0) {
- PERR("[error] Missing port number after delimitor ':'\n");
- ret = -1;
- goto end;
- }
- break;
- case '/':
- /* Optional session ID */
- ret = sscanf(remain[0], "/%s", remain[2]);
- /* Accept 0 or 1 (optional) */
- if (ret < 0) {
- goto end;
- }
- break;
- default:
- PERR("[error] wrong delimitor : %c\n", remain[0][0]);
- ret = -1;
- goto end;
- }
- }
- if (viewer_connection->port < 0) {
+ viewer_connection->relay_hostname =
+ lttng_live_url_parts.hostname;
+ lttng_live_url_parts.hostname = NULL;
+
+ if (lttng_live_url_parts.port >= 0) {
+ viewer_connection->port = lttng_live_url_parts.port;
+ } else {
viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
}
- if (strlen(remain[2]) == 0) {
- PDBG("Connecting to hostname : %s, port : %d, "
- "proto : IPv%d\n",
- viewer_connection->relay_hostname,
- viewer_connection->port,
- proto);
- ret = 0;
- goto end;
- }
- ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s",
- viewer_connection->target_hostname,
- viewer_connection->session_name);
- if (ret != 2) {
- PERR("[error] Format : "
- "net://<hostname>/host/<target_hostname>/<session_name>\n");
- goto end;
+ viewer_connection->target_hostname =
+ lttng_live_url_parts.target_hostname;
+ lttng_live_url_parts.target_hostname = NULL;
+
+ if (lttng_live_url_parts.session_name) {
+ viewer_connection->session_name =
+ lttng_live_url_parts.session_name;
+ lttng_live_url_parts.session_name = NULL;
}
- PDBG("Connecting to hostname : %s, port : %d, "
+ BT_LOGD("Connecting to hostname : %s, port : %d, "
"target hostname : %s, session name : %s, "
- "proto : IPv%d\n",
- viewer_connection->relay_hostname,
+ "proto : %s",
+ viewer_connection->relay_hostname->str,
viewer_connection->port,
- viewer_connection->target_hostname,
- viewer_connection->session_name, proto);
+ viewer_connection->target_hostname == NULL ?
+ "<none>" : viewer_connection->target_hostname->str,
+ viewer_connection->session_name == NULL ?
+ "<none>" : viewer_connection->session_name->str,
+ lttng_live_url_parts.proto->str);
ret = 0;
end:
+ bt_common_destroy_lttng_live_url_parts(<tng_live_url_parts);
return ret;
}
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect));
- if (ret_len < 0) {
- PERR("Error sending version: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending version: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(connect));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect));
+ ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("[error] Error receiving version: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving version: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(connect));
- PDBG("Received viewer session ID : %" PRIu64 "\n",
+ BT_LOGD("Received viewer session ID : %" PRIu64,
be64toh(connect.viewer_session_id));
- PDBG("Relayd version : %u.%u\n", be32toh(connect.major),
+ BT_LOGD("Relayd version : %u.%u", be32toh(connect.major),
be32toh(connect.minor));
if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
- PERR("Incompatible lttng-relayd protocol\n");
+ BT_LOGE("Incompatible lttng-relayd protocol");
goto error;
}
/* Use the smallest protocol version implemented. */
return ret;
error:
- PERR("Unable to establish connection\n");
+ BT_LOGE("Unable to establish connection");
return -1;
}
goto error;
}
- host = gethostbyname(viewer_connection->relay_hostname);
+ host = gethostbyname(viewer_connection->relay_hostname->str);
if (!host) {
- PERR("[error] Cannot lookup hostname %s\n",
- viewer_connection->relay_hostname);
+ BT_LOGE("Cannot lookup hostname %s",
+ viewer_connection->relay_hostname->str);
goto error;
}
- if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
- PERR("[error] Socket creation failed: %s\n", strerror(errno));
+ if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
+ BT_LOGE("Socket creation failed: %s", bt_socket_errormsg());
goto error;
}
memset(&(server_addr.sin_zero), 0, 8);
if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
- sizeof(struct sockaddr)) == -1) {
- PERR("[error] Connection failed: %s\n", strerror(errno));
+ sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
+ BT_LOGE("Connection failed: %s", bt_socket_errormsg());
goto error;
}
if (lttng_live_handshake(viewer_connection)) {
return ret;
error:
- if (viewer_connection->control_sock >= 0) {
- if (close(viewer_connection->control_sock)) {
- PERR("Close: %s", strerror(errno));
+ if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
+ if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
+ BT_LOGE("Close: %s", bt_socket_errormsg());
}
}
- viewer_connection->control_sock = -1;
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
return -1;
}
static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
{
- if (viewer_connection->control_sock < 0) {
+ if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
return;
}
- if (close(viewer_connection->control_sock)) {
- PERR("Close: %s", strerror(errno));
- viewer_connection->control_sock = -1;
+ if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
+ BT_LOGE("Close: %s", bt_socket_errormsg());
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
}
}
results = bt_value_array_create();
if (!results) {
- fprintf(stderr, "Error creating array\n");
+ BT_LOGE("Error creating array");
goto error;
}
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- fprintf(stderr, "Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+ ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
- fprintf(stderr, "Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- fprintf(stderr, "Error receiving session list: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(list));
for (i = 0; i < sessions_count; i++) {
struct lttng_viewer_session lsession;
- ret_len = lttng_live_recv(viewer_connection->control_sock,
+ ret_len = lttng_live_recv(viewer_connection,
&lsession, sizeof(lsession));
if (ret_len == 0) {
- fprintf(stderr, "Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- fprintf(stderr, "Error receiving session: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(lsession));
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+ ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving session list: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(list));
sessions_count = be32toh(list.sessions_count);
for (i = 0; i < sessions_count; i++) {
- ret_len = lttng_live_recv(viewer_connection->control_sock,
+ ret_len = lttng_live_recv(viewer_connection,
&lsession, sizeof(lsession));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving session: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(lsession));
lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
session_id = be64toh(lsession.id);
+ BT_LOGD("Adding session %" PRIu64 " hostname: %s session_name: %s",
+ session_id, lsession.hostname, lsession.session_name);
+
if ((strncmp(lsession.session_name,
- viewer_connection->session_name,
- MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
- viewer_connection->target_hostname,
- MAXNAMLEN) == 0)) {
- if (lttng_live_add_session(lttng_live, session_id)) {
+ viewer_connection->session_name->str,
+ LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
+ viewer_connection->target_hostname->str,
+ LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
+ if (lttng_live_add_session(lttng_live, session_id,
+ lsession.hostname,
+ lsession.session_name)) {
goto error;
}
}
return 0;
error:
- PERR("Unable to query session ids\n");
+ BT_LOGE("Unable to query session ids");
return -1;
}
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp));
+ ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving create session reply: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(resp));
if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
- PERR("Error creating viewer session\n");
+ BT_LOGE("Error creating viewer session");
goto error;
}
if (lttng_live_query_session_ids(lttng_live)) {
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
- PDBG("Getting %" PRIu32 " new streams:\n", stream_count);
+ BT_LOGD("Getting %" PRIu32 " new streams:", stream_count);
for (i = 0; i < stream_count; i++) {
struct lttng_viewer_stream stream;
struct lttng_live_stream_iterator *live_stream;
uint64_t stream_id;
uint64_t ctf_trace_id;
- ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream));
+ ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving stream\n");
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving stream");
goto error;
}
assert(ret_len == sizeof(stream));
ctf_trace_id = be64toh(stream.ctf_trace_id);
if (stream.metadata_flag) {
- PDBG(" metadata stream %" PRIu64 " : %s/%s\n",
+ BT_LOGD(" metadata stream %" PRIu64 " : %s/%s",
stream_id, stream.path_name,
stream.channel_name);
if (lttng_live_metadata_create_stream(session,
- ctf_trace_id, stream_id)) {
- PERR("Error creating metadata stream\n");
+ ctf_trace_id, stream_id,
+ stream.path_name)) {
+ BT_LOGE("Error creating metadata stream");
goto error;
}
session->lazy_stream_notif_init = true;
} else {
- PDBG(" stream %" PRIu64 " : %s/%s\n",
+ BT_LOGD(" stream %" PRIu64 " : %s/%s",
stream_id, stream.path_name,
stream.channel_name);
live_stream = lttng_live_stream_iterator_create(session,
ctf_trace_id, stream_id);
if (!live_stream) {
- PERR("Error creating stream\n");
+ BT_LOGE("Error creating streamn");
goto error;
}
}
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
- if (ret_len < 0) {
- PERR("Error sending attach request: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving attach response: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rp));
case LTTNG_VIEWER_ATTACH_OK:
break;
case LTTNG_VIEWER_ATTACH_UNK:
- PERR("Session id %" PRIu64 " is unknown\n", session_id);
+ BT_LOGW("Session id %" PRIu64 " is unknown", session_id);
goto error;
case LTTNG_VIEWER_ATTACH_ALREADY:
- PERR("There is already a viewer attached to this session\n");
+ BT_LOGW("There is already a viewer attached to this session");
goto error;
case LTTNG_VIEWER_ATTACH_NOT_LIVE:
- PERR("Not a live session\n");
+ BT_LOGW("Not a live session");
goto error;
case LTTNG_VIEWER_ATTACH_SEEK_ERR:
- PERR("Wrong seek parameter\n");
+ BT_LOGE("Wrong seek parameter");
goto error;
default:
- PERR("Unknown attach return code %u\n", be32toh(rp.status));
+ BT_LOGE("Unknown attach return code %u", be32toh(rp.status));
goto error;
}
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session_id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
- if (ret_len < 0) {
- PERR("Error sending detach request: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving detach response: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rp));
case LTTNG_VIEWER_DETACH_SESSION_OK:
break;
case LTTNG_VIEWER_DETACH_SESSION_UNK:
- PERR("Session id %" PRIu64 " is unknown\n", session_id);
+ BT_LOGW("Session id %" PRIu64 " is unknown", session_id);
goto error;
case LTTNG_VIEWER_DETACH_SESSION_ERR:
- PERR("Error detaching session id %" PRIu64 "\n", session_id);
+ BT_LOGW("Error detaching session id %" PRIu64 "", session_id);
goto error;
default:
- PERR("Unknown detach return code %u\n", be32toh(rp.status));
+ BT_LOGE("Unknown detach return code %u", be32toh(rp.status));
goto error;
}
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
- if (ret_len < 0) {
- PERR("Error sending get_metadata request: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving get_metadata response: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rp));
switch (be32toh(rp.status)) {
case LTTNG_VIEWER_METADATA_OK:
- PDBG("get_metadata : OK\n");
+ BT_LOGD("get_metadata : OK");
break;
case LTTNG_VIEWER_NO_NEW_METADATA:
- PDBG("get_metadata : NO NEW\n");
+ BT_LOGD("get_metadata : NO NEW");
ret = 0;
goto end;
case LTTNG_VIEWER_METADATA_ERR:
- PDBG("get_metadata : ERR\n");
+ BT_LOGD("get_metadata : ERR");
goto error;
default:
- PDBG("get_metadata : UNKNOWN\n");
+ BT_LOGD("get_metadata : UNKNOWN");
goto error;
}
len = be64toh(rp.len);
- PDBG("Writing %" PRIu64" bytes to metadata\n", len);
+ BT_LOGD("Writing %" PRIu64" bytes to metadata", len);
if (len <= 0) {
goto error;
}
data = zmalloc(len);
if (!data) {
- PERR("relay data zmalloc: %s", strerror(errno));
+ BT_LOGE("relay data zmalloc: %s", strerror(errno));
goto error;
}
- ret_len = lttng_live_recv(viewer_connection->control_sock, data, len);
+ ret_len = lttng_live_recv(viewer_connection, data, len);
if (ret_len == 0) {
- PERR("[error] Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error_free_data;
}
- if (ret_len < 0) {
- PERR("[error] Error receiving trace packet: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
goto error_free_data;
}
assert(ret_len == len);
ret_len = fwrite(data, 1, len, fp);
} while (ret_len < 0 && errno == EINTR);
if (ret_len < 0) {
- PERR("[error] Writing in the metadata fp\n");
+ BT_LOGE("Writing in the metadata fp");
goto error_free_data;
}
assert(ret_len == len);
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->viewer_stream_id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
- if (ret_len < 0) {
- PERR("Error sending get_next_index request: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving get_next_index response: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rp));
{
uint64_t ctf_stream_class_id;
- PDBG("get_next_index: inactive\n");
+ BT_LOGD("get_next_index: inactive");
memset(index, 0, sizeof(struct packet_index));
index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
{
uint64_t ctf_stream_class_id;
- PDBG("get_next_index: OK\n");
+ BT_LOGD("get_next_index: OK");
lttng_index_to_packet_index(&rp, index);
ctf_stream_class_id = be64toh(rp.stream_id);
if (stream->ctf_stream_class_id != -1ULL) {
index->ts_cycles.timestamp_end;
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
- PDBG("get_next_index: new metadata needed\n");
+ BT_LOGD("get_next_index: new metadata needed");
trace->new_metadata_needed = true;
}
if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
- PDBG("get_next_index: new streams needed\n");
+ BT_LOGD("get_next_index: new streams needed");
lttng_live_need_new_streams(lttng_live);
}
break;
}
case LTTNG_VIEWER_INDEX_RETRY:
- PDBG("get_next_index: retry\n");
+ BT_LOGD("get_next_index: retry");
memset(index, 0, sizeof(struct packet_index));
retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
goto end;
case LTTNG_VIEWER_INDEX_HUP:
- PDBG("get_next_index: stream hung up\n");
+ BT_LOGD("get_next_index: stream hung up");
memset(index, 0, sizeof(struct packet_index));
index->offset = EOF;
retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
stream->state = LTTNG_LIVE_STREAM_EOF;
break;
case LTTNG_VIEWER_INDEX_ERR:
- PERR("get_next_index: error\n");
+ BT_LOGE("get_next_index: error");
memset(index, 0, sizeof(struct packet_index));
stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
goto error;
default:
- PERR("get_next_index: unkwown value\n");
+ BT_LOGE("get_next_index: unknown value");
memset(index, 0, sizeof(struct packet_index));
stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
goto error;
return retstatus;
error:
- retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (lttng_live_is_canceled(lttng_live)) {
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
return retstatus;
}
lttng_live->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
- PDBG("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64 "\n",
+ BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
rq.offset = htobe64(offset);
rq.len = htobe32(req_len);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
- if (ret_len < 0) {
- PERR("Error sending get_data request: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving get_data response: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_data response: %s", bt_socket_errormsg());
goto error;
}
if (ret_len != sizeof(rp)) {
- PERR("[error] get_data_packet: expected %zu"
- ", received %zd\n", sizeof(rp),
+ BT_LOGE("get_data_packet: expected %zu"
+ ", received %zd", sizeof(rp),
ret_len);
goto error;
}
switch (status) {
case LTTNG_VIEWER_GET_PACKET_OK:
req_len = be32toh(rp.len);
- PDBG("get_data_packet: Ok, packet size : %" PRIu64 "\n", req_len);
+ BT_LOGD("get_data_packet: Ok, packet size : %" PRIu64 "", req_len);
break;
case LTTNG_VIEWER_GET_PACKET_RETRY:
/* Unimplemented by relay daemon */
- PDBG("get_data_packet: retry\n");
+ BT_LOGD("get_data_packet: retry");
retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
goto end;
case LTTNG_VIEWER_GET_PACKET_ERR:
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
- PDBG("get_data_packet: new metadata needed, try again later\n");
+ BT_LOGD("get_data_packet: new metadata needed, try again later");
trace->new_metadata_needed = true;
}
if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
- PDBG("get_data_packet: new streams needed, try again later\n");
+ BT_LOGD("get_data_packet: new streams needed, try again later");
lttng_live_need_new_streams(lttng_live);
}
if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
goto end;
}
- PERR("get_data_packet: error\n");
+ BT_LOGE("get_data_packet: error");
goto error;
case LTTNG_VIEWER_GET_PACKET_EOF:
retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF;
goto end;
default:
- PDBG("get_data_packet: unknown\n");
+ BT_LOGE("get_data_packet: unknown");
goto error;
}
goto error;
}
- ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len);
+ ret_len = lttng_live_recv(viewer_connection, buf, req_len);
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving trace packet: %s\n", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == req_len);
return retstatus;
error:
- retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ if (lttng_live_is_canceled(lttng_live)) {
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ } else {
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ }
return retstatus;
}
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session->id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- PERR("Error sending cmd: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
- if (ret_len < 0) {
- PERR("Error sending get_new_streams request: %s\n", strerror(errno));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
- PERR("Remote side has closed connection\n");
+ BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- PERR("Error receiving get_new_streams response\n");
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_new_streams response");
goto error;
}
assert(ret_len == sizeof(rp));
status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
goto end;
case LTTNG_VIEWER_NEW_STREAMS_ERR:
- PERR("get_new_streams error\n");
+ BT_LOGE("get_new_streams error");
goto error;
default:
- PERR("Unknown return code %u\n", be32toh(rp.status));
+ BT_LOGE("Unknown return code %u", be32toh(rp.status));
goto error;
}
return status;
error:
- status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (lttng_live_is_canceled(lttng_live)) {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
return status;
}
BT_HIDDEN
struct bt_live_viewer_connection *
- bt_live_viewer_connection_create(const char *url, FILE *error_fp)
+ bt_live_viewer_connection_create(const char *url,
+ struct lttng_live_component *lttng_live)
{
struct bt_live_viewer_connection *viewer_connection;
viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
+ if (bt_socket_init() != 0) {
+ goto error;
+ }
+
bt_object_init(&viewer_connection->obj, connection_release);
- viewer_connection->control_sock = -1;
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
viewer_connection->port = -1;
- viewer_connection->error_fp = error_fp;
+ viewer_connection->lttng_live = lttng_live;
viewer_connection->url = g_string_new(url);
if (!viewer_connection->url) {
goto error;
}
- PDBG("Establishing connection to url \"%s\"...\n", url);
+ BT_LOGD("Establishing connection to url \"%s\"...", url);
if (lttng_live_connect_viewer(viewer_connection)) {
goto error_report;
}
- PDBG("Connection to url \"%s\" is established\n", url);
+ BT_LOGD("Connection to url \"%s\" is established", url);
return viewer_connection;
error_report:
- printf_verbose("Failure to establish connection to url \"%s\"\n", url);
+ BT_LOGW("Failure to establish connection to url \"%s\"", url);
error:
g_free(viewer_connection);
return NULL;
BT_HIDDEN
void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
{
- PDBG("Closing connection to url \"%s\"\n", viewer_connection->url->str);
+ BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str);
lttng_live_disconnect_viewer(viewer_connection);
g_string_free(viewer_connection->url, TRUE);
+ if (viewer_connection->relay_hostname) {
+ g_string_free(viewer_connection->relay_hostname, TRUE);
+ }
+ if (viewer_connection->target_hostname) {
+ g_string_free(viewer_connection->target_hostname, TRUE);
+ }
+ if (viewer_connection->session_name) {
+ g_string_free(viewer_connection->session_name, TRUE);
+ }
g_free(viewer_connection);
+
+ bt_socket_fini();
}