#include <babeltrace/compat/send-internal.h>
#include <babeltrace/compiler-internal.h>
+#include <babeltrace/common-internal.h>
+#include <babeltrace/graph/graph.h>
#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER"
#include "data-stream.h"
#include "metadata.h"
-static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
+static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
+ void *buf, size_t len)
{
ssize_t ret;
size_t copied = 0, to_copy = len;
+ struct lttng_live_component *lttng_live =
+ viewer_connection->lttng_live;
+ int fd = viewer_connection->control_sock;
do {
ret = recv(fd, buf + copied, to_copy, 0);
copied += ret;
to_copy -= ret;
}
- } while ((ret > 0 && to_copy > 0)
- || (ret < 0 && errno == EINTR));
+ if (ret < 0 && errno == EINTR) {
+ if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
+ break;
+ } else {
+ continue;
+ }
+ }
+ } while (ret > 0 && to_copy > 0);
if (ret > 0)
ret = copied;
/* ret = 0 means orderly shutdown, ret < 0 is error. */
return ret;
}
-static ssize_t lttng_live_send(int fd, const void *buf, size_t len)
+static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
+ const void *buf, size_t len)
{
+ struct lttng_live_component *lttng_live =
+ viewer_connection->lttng_live;
+ int fd = viewer_connection->control_sock;
ssize_t ret;
- do {
+ for (;;) {
ret = bt_send_nosigpipe(fd, buf, len);
- } while (ret < 0 && errno == EINTR);
+ if (ret < 0 && errno == EINTR) {
+ if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
+ break;
+ } else {
+ continue;
+ }
+ } else {
+ break;
+ }
+ }
return ret;
}
-/*
- * hostname parameter needs to hold MAXNAMLEN chars.
- */
static int parse_url(struct bt_live_viewer_connection *viewer_connection)
{
- char remain[3][MAXNAMLEN];
- int ret = -1, proto, proto_offset = 0;
+ char error_buf[256] = { 0 };
+ struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
+ int ret = -1;
const char *path = viewer_connection->url->str;
- size_t path_len;
if (!path) {
goto end;
}
- path_len = strlen(path); /* not accounting \0 */
- /*
- * Since sscanf API does not allow easily checking string length
- * against a size defined by a macro. Test it beforehand on the
- * input. We know the output is always <= than the input length.
- */
- if (path_len >= MAXNAMLEN) {
+ lttng_live_url_parts = bt_common_parse_lttng_live_url(path,
+ error_buf, sizeof(error_buf));
+ if (!lttng_live_url_parts.proto) {
+ BT_LOGW("Invalid LTTng live URL format: %s", error_buf);
goto end;
}
- ret = sscanf(path, "net%d://", &proto);
- if (ret < 1) {
- proto = 4;
- /* net:// */
- proto_offset = strlen("net://");
- } else {
- /* net4:// or net6:// */
- proto_offset = strlen("netX://");
- }
- if (proto_offset > path_len) {
- goto end;
- }
- if (proto == 6) {
- BT_LOGW("IPv6 is currently unsupported by lttng-live");
- goto end;
- }
- /* TODO : parse for IPv6 as well */
- /* Parse the hostname or IP */
- ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s",
- viewer_connection->relay_hostname, remain[0]);
- if (ret == 2) {
- /* Optional port number */
- switch (remain[0][0]) {
- case ':':
- ret = sscanf(remain[0], ":%d%s", &viewer_connection->port, remain[1]);
- /* Optional session ID with port number */
- if (ret == 2) {
- ret = sscanf(remain[1], "/%s", remain[2]);
- /* Accept 0 or 1 (optional) */
- if (ret < 0) {
- goto end;
- }
- } else if (ret == 0) {
- BT_LOGW("Missing port number after delimitor ':'");
- ret = -1;
- goto end;
- }
- break;
- case '/':
- /* Optional session ID */
- ret = sscanf(remain[0], "/%s", remain[2]);
- /* Accept 0 or 1 (optional) */
- if (ret < 0) {
- goto end;
- }
- break;
- default:
- BT_LOGW("wrong delimitor : %c", remain[0][0]);
- ret = -1;
- goto end;
- }
- }
- if (viewer_connection->port < 0) {
+ viewer_connection->relay_hostname =
+ lttng_live_url_parts.hostname;
+ lttng_live_url_parts.hostname = NULL;
+
+ if (lttng_live_url_parts.port >= 0) {
+ viewer_connection->port = lttng_live_url_parts.port;
+ } else {
viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
}
- if (strlen(remain[2]) == 0) {
- BT_LOGD("Connecting to hostname : %s, port : %d, "
- "proto : IPv%d",
- viewer_connection->relay_hostname,
- viewer_connection->port,
- proto);
- ret = 0;
- goto end;
- }
- ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s",
- viewer_connection->target_hostname,
- viewer_connection->session_name);
- if (ret != 2) {
- BT_LOGW("Format : "
- "net://<hostname>/host/<target_hostname>/<session_name>");
- goto end;
+ viewer_connection->target_hostname =
+ lttng_live_url_parts.target_hostname;
+ lttng_live_url_parts.target_hostname = NULL;
+
+ if (lttng_live_url_parts.session_name) {
+ viewer_connection->session_name =
+ lttng_live_url_parts.session_name;
+ lttng_live_url_parts.session_name = NULL;
}
BT_LOGD("Connecting to hostname : %s, port : %d, "
"target hostname : %s, session name : %s, "
- "proto : IPv%d",
- viewer_connection->relay_hostname,
+ "proto : %s",
+ viewer_connection->relay_hostname->str,
viewer_connection->port,
- viewer_connection->target_hostname,
- viewer_connection->session_name, proto);
+ viewer_connection->target_hostname == NULL ?
+ "<none>" : viewer_connection->target_hostname->str,
+ viewer_connection->session_name == NULL ?
+ "<none>" : viewer_connection->session_name->str,
+ lttng_live_url_parts.proto->str);
ret = 0;
end:
+ bt_common_destroy_lttng_live_url_parts(<tng_live_url_parts);
return ret;
}
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect));
+ ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
if (ret_len < 0) {
BT_LOGE("Error sending version: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(connect));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect));
+ ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
goto error;
}
- host = gethostbyname(viewer_connection->relay_hostname);
+ host = gethostbyname(viewer_connection->relay_hostname->str);
if (!host) {
BT_LOGE("Cannot lookup hostname %s",
- viewer_connection->relay_hostname);
+ viewer_connection->relay_hostname->str);
goto error;
}
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+ ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
for (i = 0; i < sessions_count; i++) {
struct lttng_viewer_session lsession;
- ret_len = lttng_live_recv(viewer_connection->control_sock,
+ ret_len = lttng_live_recv(viewer_connection,
&lsession, sizeof(lsession));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+ ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
sessions_count = be32toh(list.sessions_count);
for (i = 0; i < sessions_count; i++) {
- ret_len = lttng_live_recv(viewer_connection->control_sock,
+ ret_len = lttng_live_recv(viewer_connection,
&lsession, sizeof(lsession));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
session_id = be64toh(lsession.id);
if ((strncmp(lsession.session_name,
- viewer_connection->session_name,
+ viewer_connection->session_name->str,
MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
- viewer_connection->target_hostname,
+ viewer_connection->target_hostname->str,
MAXNAMLEN) == 0)) {
if (lttng_live_add_session(lttng_live, session_id)) {
goto error;
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp));
+ ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
uint64_t stream_id;
uint64_t ctf_trace_id;
- ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream));
+ ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending attach request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session_id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending detach request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending get_metadata request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
BT_LOGE("relay data zmalloc: %s", strerror(errno));
goto error;
}
- ret_len = lttng_live_recv(viewer_connection->control_sock, data, len);
+ ret_len = lttng_live_recv(viewer_connection, data, len);
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error_free_data;
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->viewer_stream_id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending get_next_index request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
return retstatus;
error:
- retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (bt_graph_is_canceled(lttng_live->graph)) {
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
return retstatus;
}
rq.offset = htobe64(offset);
rq.len = htobe32(req_len);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending get_data request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
goto error;
}
- ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len);
+ ret_len = lttng_live_recv(viewer_connection, buf, req_len);
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
return retstatus;
error:
- retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ if (bt_graph_is_canceled(lttng_live->graph)) {
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ } else {
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ }
return retstatus;
}
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session->id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending get_new_streams request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
return status;
error:
- status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (bt_graph_is_canceled(lttng_live->graph)) {
+ status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ } else {
+ status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ }
return status;
}
BT_HIDDEN
struct bt_live_viewer_connection *
- bt_live_viewer_connection_create(const char *url, FILE *error_fp)
+ bt_live_viewer_connection_create(const char *url,
+ struct lttng_live_component *lttng_live)
{
struct bt_live_viewer_connection *viewer_connection;
bt_object_init(&viewer_connection->obj, connection_release);
viewer_connection->control_sock = -1;
viewer_connection->port = -1;
- viewer_connection->error_fp = error_fp;
+ viewer_connection->lttng_live = lttng_live;
viewer_connection->url = g_string_new(url);
if (!viewer_connection->url) {
goto error;
BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str);
lttng_live_disconnect_viewer(viewer_connection);
g_string_free(viewer_connection->url, TRUE);
+ if (viewer_connection->relay_hostname) {
+ g_string_free(viewer_connection->relay_hostname, TRUE);
+ }
+ if (viewer_connection->target_hostname) {
+ g_string_free(viewer_connection->target_hostname, TRUE);
+ }
+ if (viewer_connection->session_name) {
+ g_string_free(viewer_connection->session_name, TRUE);
+ }
g_free(viewer_connection);
}