* SOFTWARE.
*/
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC-VIEWER"
+#include "logging.h"
+
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include <glib.h>
#include <inttypes.h>
-#include <sys/socket.h>
#include <sys/types.h>
-#include <netinet/in.h>
-#include <netdb.h>
#include <fcntl.h>
-#include <poll.h>
-#include <babeltrace/compat/send-internal.h>
+#include <babeltrace/compat/socket-internal.h>
+#include <babeltrace/endian-internal.h>
#include <babeltrace/compiler-internal.h>
#include <babeltrace/common-internal.h>
#include <babeltrace/graph/graph.h>
-#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER"
-
#include "lttng-live-internal.h"
#include "viewer-connection.h"
#include "lttng-viewer-abi.h"
size_t copied = 0, to_copy = len;
struct lttng_live_component *lttng_live =
viewer_connection->lttng_live;
- int fd = viewer_connection->control_sock;
+ BT_SOCKET sock = viewer_connection->control_sock;
do {
- ret = recv(fd, buf + copied, to_copy, 0);
+ ret = bt_socket_recv(sock, buf + copied, to_copy, 0);
if (ret > 0) {
assert(ret <= to_copy);
copied += ret;
to_copy -= ret;
}
- if (ret < 0 && errno == EINTR) {
- if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
+ if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
+ if (lttng_live_is_canceled(lttng_live)) {
break;
} else {
continue;
} while (ret > 0 && to_copy > 0);
if (ret > 0)
ret = copied;
- /* ret = 0 means orderly shutdown, ret < 0 is error. */
+ /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */
return ret;
}
{
struct lttng_live_component *lttng_live =
viewer_connection->lttng_live;
- int fd = viewer_connection->control_sock;
+ BT_SOCKET sock = viewer_connection->control_sock;
ssize_t ret;
for (;;) {
- ret = bt_send_nosigpipe(fd, buf, len);
- if (ret < 0 && errno == EINTR) {
- if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
+ ret = bt_socket_send_nosigpipe(sock, buf, len);
+ if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
+ if (lttng_live_is_canceled(lttng_live)) {
break;
} else {
continue;
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
- if (ret_len < 0) {
- BT_LOGE("Error sending version: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending version: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(connect));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving version: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving version: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(connect));
goto error;
}
- if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
- BT_LOGE("Socket creation failed: %s", strerror(errno));
+ if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
+ BT_LOGE("Socket creation failed: %s", bt_socket_errormsg());
goto error;
}
memset(&(server_addr.sin_zero), 0, 8);
if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
- sizeof(struct sockaddr)) == -1) {
- BT_LOGE("Connection failed: %s", strerror(errno));
+ sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
+ BT_LOGE("Connection failed: %s", bt_socket_errormsg());
goto error;
}
if (lttng_live_handshake(viewer_connection)) {
return ret;
error:
- if (viewer_connection->control_sock >= 0) {
- if (close(viewer_connection->control_sock)) {
- BT_LOGE("Close: %s", strerror(errno));
+ if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
+ if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
+ BT_LOGE("Close: %s", bt_socket_errormsg());
}
}
- viewer_connection->control_sock = -1;
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
return -1;
}
static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
{
- if (viewer_connection->control_sock < 0) {
+ if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
return;
}
- if (close(viewer_connection->control_sock)) {
- BT_LOGE("Close: %s", strerror(errno));
- viewer_connection->control_sock = -1;
+ if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
+ BT_LOGE("Close: %s", bt_socket_errormsg());
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
}
}
cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving session list: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(list));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving session: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(lsession));
cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving session list: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(list));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving session: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(lsession));
lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
session_id = be64toh(lsession.id);
+ BT_LOGD("Adding session %" PRIu64 " hostname: %s session_name: %s",
+ session_id, lsession.hostname, lsession.session_name);
+
if ((strncmp(lsession.session_name,
viewer_connection->session_name->str,
- MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
+ LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
viewer_connection->target_hostname->str,
- MAXNAMLEN) == 0)) {
- if (lttng_live_add_session(lttng_live, session_id)) {
+ LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
+ if (lttng_live_add_session(lttng_live, session_id,
+ lsession.hostname,
+ lsession.session_name)) {
goto error;
}
}
cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving create session reply: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(resp));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
+ if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error receiving stream");
goto error;
}
stream_id, stream.path_name,
stream.channel_name);
if (lttng_live_metadata_create_stream(session,
- ctf_trace_id, stream_id)) {
+ ctf_trace_id, stream_id,
+ stream.path_name)) {
BT_LOGE("Error creating metadata stream");
goto error;
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending attach request: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving attach response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rp));
rq.session_id = htobe64(session_id);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending detach request: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving detach response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rp));
cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending get_metadata request: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving get_metadata response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rp));
BT_LOGI("Remote side has closed connection");
goto error_free_data;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving trace packet: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
goto error_free_data;
}
assert(ret_len == len);
rq.stream_id = htobe64(stream->viewer_stream_id);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending get_next_index request: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving get_next_index response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rp));
return retstatus;
error:
- if (bt_graph_is_canceled(lttng_live->graph)) {
+ if (lttng_live_is_canceled(lttng_live)) {
retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
rq.len = htobe32(req_len);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending get_data request: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving get_data response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_data response: %s", bt_socket_errormsg());
goto error;
}
if (ret_len != sizeof(rp)) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving trace packet: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == req_len);
return retstatus;
error:
- if (bt_graph_is_canceled(lttng_live->graph)) {
+ if (lttng_live_is_canceled(lttng_live)) {
retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
} else {
retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
rq.session_id = htobe64(session->id);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(cmd));
ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending get_new_streams request: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
goto error;
}
assert(ret_len == sizeof(rq));
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
+ if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error receiving get_new_streams response");
goto error;
}
return status;
error:
- if (bt_graph_is_canceled(lttng_live->graph)) {
- status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ if (lttng_live_is_canceled(lttng_live)) {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
return status;
}
viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
+ if (bt_socket_init() != 0) {
+ goto error;
+ }
+
bt_object_init(&viewer_connection->obj, connection_release);
- viewer_connection->control_sock = -1;
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
viewer_connection->port = -1;
viewer_connection->lttng_live = lttng_live;
viewer_connection->url = g_string_new(url);
g_string_free(viewer_connection->session_name, TRUE);
}
g_free(viewer_connection);
+
+ bt_socket_fini();
}