Adapt `src.ctf.lttng-live` to current API
[babeltrace.git] / plugins / ctf / lttng-live / viewer-connection.c
index 64cf81e7e4dc11c00f2d50bc9af4c1e44ba82760..b37db12ec46e2f9a0f464658b71e7e63a0440692 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright 2019 - Francis Deslauriers <francis.deslauriers@efficios.com>
  * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -20,6 +21,9 @@
  * SOFTWARE.
  */
 
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC-VIEWER"
+#include "logging.h"
+
 #include <stdio.h>
 #include <stdint.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <glib.h>
 #include <inttypes.h>
-#include <sys/socket.h>
 #include <sys/types.h>
-#include <netinet/in.h>
-#include <netdb.h>
 #include <fcntl.h>
-#include <poll.h>
 
-#include <babeltrace/compat/send-internal.h>
+#include <babeltrace/compat/socket-internal.h>
+#include <babeltrace/endian-internal.h>
 #include <babeltrace/compiler-internal.h>
+#include <babeltrace/common-internal.h>
+#include <babeltrace/babeltrace.h>
 
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
 #include "viewer-connection.h"
 #include "lttng-viewer-abi.h"
 #include "data-stream.h"
 #include "metadata.h"
 
-#define PRINT_ERR_STREAM       viewer_connection->error_fp
-#define PRINT_PREFIX           "lttng-live-viewer-connection"
-#define PRINT_DBG_CHECK                lttng_live_debug
-#include "../print.h"
-
-static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
+static
+ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection,
+               void *buf, size_t len)
 {
        ssize_t ret;
        size_t copied = 0, to_copy = len;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+               viewer_connection->lttng_live_msg_iter;
+       BT_SOCKET sock = viewer_connection->control_sock;
 
        do {
-               ret = recv(fd, buf + copied, to_copy, 0);
+               ret = bt_socket_recv(sock, buf + copied, to_copy, 0);
                if (ret > 0) {
-                       assert(ret <= to_copy);
+                       BT_ASSERT(ret <= to_copy);
                        copied += ret;
                        to_copy -= ret;
                }
-       } while ((ret > 0 && to_copy > 0)
-               || (ret < 0 && errno == EINTR));
+               if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
+                       if (!viewer_connection->in_query &&
+                                       lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
+                               break;
+                       } else {
+                               continue;
+                       }
+               }
+       } while (ret > 0 && to_copy > 0);
        if (ret > 0)
                ret = copied;
-       /* ret = 0 means orderly shutdown, ret < 0 is error. */
+       /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */
        return ret;
 }
 
-static ssize_t lttng_live_send(int fd, const void *buf, size_t len)
+static
+ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection,
+               const void *buf, size_t len)
 {
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+               viewer_connection->lttng_live_msg_iter;
+       BT_SOCKET sock = viewer_connection->control_sock;
        ssize_t ret;
 
-       do {
-               ret = bt_send_nosigpipe(fd, buf, len);
-       } while (ret < 0 && errno == EINTR);
+       for (;;) {
+               ret = bt_socket_send_nosigpipe(sock, buf, len);
+               if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
+                       if (!viewer_connection->in_query &&
+                                       lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
+                               break;
+                       } else {
+                               continue;
+                       }
+               } else {
+                       break;
+               }
+       }
        return ret;
 }
 
-/*
- * hostname parameter needs to hold MAXNAMLEN chars.
- */
-static int parse_url(struct bt_live_viewer_connection *viewer_connection)
+static
+int parse_url(struct live_viewer_connection *viewer_connection)
 {
-       char remain[3][MAXNAMLEN];
-       int ret = -1, proto, proto_offset = 0;
+       char error_buf[256] = { 0 };
+       struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
+       int ret = -1;
        const char *path = viewer_connection->url->str;
-       size_t path_len;
 
        if (!path) {
                goto end;
        }
-       path_len = strlen(path); /* not accounting \0 */
 
-       /*
-        * Since sscanf API does not allow easily checking string length
-        * against a size defined by a macro. Test it beforehand on the
-        * input. We know the output is always <= than the input length.
-        */
-       if (path_len >= MAXNAMLEN) {
+       lttng_live_url_parts = bt_common_parse_lttng_live_url(path,
+                       error_buf, sizeof(error_buf));
+       if (!lttng_live_url_parts.proto) {
+               BT_LOGW("Invalid LTTng live URL format: %s", error_buf);
                goto end;
        }
-       ret = sscanf(path, "net%d://", &proto);
-       if (ret < 1) {
-               proto = 4;
-               /* net:// */
-               proto_offset = strlen("net://");
-       } else {
-               /* net4:// or net6:// */
-               proto_offset = strlen("netX://");
-       }
-       if (proto_offset > path_len) {
-               goto end;
-       }
-       if (proto == 6) {
-               PERR("[error] IPv6 is currently unsupported by lttng-live\n");
-               goto end;
-       }
-       /* TODO : parse for IPv6 as well */
-       /* Parse the hostname or IP */
-       ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s",
-               viewer_connection->relay_hostname, remain[0]);
-       if (ret == 2) {
-               /* Optional port number */
-               switch (remain[0][0]) {
-               case ':':
-                       ret = sscanf(remain[0], ":%d%s", &viewer_connection->port, remain[1]);
-                       /* Optional session ID with port number */
-                       if (ret == 2) {
-                               ret = sscanf(remain[1], "/%s", remain[2]);
-                               /* Accept 0 or 1 (optional) */
-                               if (ret < 0) {
-                                       goto end;
-                               }
-                       } else if (ret == 0) {
-                               PERR("[error] Missing port number after delimitor ':'\n");
-                               ret = -1;
-                               goto end;
-                       }
-                       break;
-               case '/':
-                       /* Optional session ID */
-                       ret = sscanf(remain[0], "/%s", remain[2]);
-                       /* Accept 0 or 1 (optional) */
-                       if (ret < 0) {
-                               goto end;
-                       }
-                       break;
-               default:
-                       PERR("[error] wrong delimitor : %c\n", remain[0][0]);
-                       ret = -1;
-                       goto end;
-               }
-       }
 
-       if (viewer_connection->port < 0) {
+       viewer_connection->relay_hostname =
+                       lttng_live_url_parts.hostname;
+       lttng_live_url_parts.hostname = NULL;
+
+       if (lttng_live_url_parts.port >= 0) {
+               viewer_connection->port = lttng_live_url_parts.port;
+       } else {
                viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
        }
 
-       if (strlen(remain[2]) == 0) {
-               PDBG("Connecting to hostname : %s, port : %d, "
-                               "proto : IPv%d\n",
-                               viewer_connection->relay_hostname,
-                               viewer_connection->port,
-                               proto);
-               ret = 0;
-               goto end;
-       }
-       ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s",
-                       viewer_connection->target_hostname,
-                       viewer_connection->session_name);
-       if (ret != 2) {
-               PERR("[error] Format : "
-                       "net://<hostname>/host/<target_hostname>/<session_name>\n");
-               goto end;
+       viewer_connection->target_hostname =
+                       lttng_live_url_parts.target_hostname;
+       lttng_live_url_parts.target_hostname = NULL;
+
+       if (lttng_live_url_parts.session_name) {
+               viewer_connection->session_name =
+                               lttng_live_url_parts.session_name;
+               lttng_live_url_parts.session_name = NULL;
        }
 
-       PDBG("Connecting to hostname : %s, port : %d, "
+       BT_LOGD("Connecting to hostname : %s, port : %d, "
                        "target hostname : %s, session name : %s, "
-                       "proto : IPv%d\n",
-                       viewer_connection->relay_hostname,
+                       "proto : %s",
+                       viewer_connection->relay_hostname->str,
                        viewer_connection->port,
-                       viewer_connection->target_hostname,
-                       viewer_connection->session_name, proto);
+                       viewer_connection->target_hostname == NULL ?
+                               "<none>" : viewer_connection->target_hostname->str,
+                       viewer_connection->session_name == NULL ?
+                               "<none>" : viewer_connection->session_name->str,
+                       lttng_live_url_parts.proto->str);
        ret = 0;
 
 end:
+       bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
        return ret;
 }
 
-static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection)
+static
+int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_connect connect;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+       char cmd_buf[cmd_buf_len];
        int ret;
        ssize_t ret_len;
 
@@ -205,38 +178,39 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
        connect.minor = htobe32(LTTNG_LIVE_MINOR);
        connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending version: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect));
-       if (ret_len < 0) {
-               PERR("Error sending version: %s\n", strerror(errno));
-               goto error;
-       }
-       assert(ret_len == sizeof(connect));
+       BT_ASSERT(ret_len == cmd_buf_len);
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect));
+       ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("[error] Error receiving version: %s", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving version: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(connect));
+       BT_ASSERT(ret_len == sizeof(connect));
 
-       PDBG("Received viewer session ID : %" PRIu64 "\n",
-                       be64toh(connect.viewer_session_id));
-       PDBG("Relayd version : %u.%u\n", be32toh(connect.major),
+       BT_LOGD("Received viewer session ID : %" PRIu64,
+                       (uint64_t) be64toh(connect.viewer_session_id));
+       BT_LOGD("Relayd version : %u.%u", be32toh(connect.major),
                        be32toh(connect.minor));
 
        if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
-               PERR("Incompatible lttng-relayd protocol\n");
+               BT_LOGE("Incompatible lttng-relayd protocol");
                goto error;
        }
        /* Use the smallest protocol version implemented. */
@@ -250,11 +224,12 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
        return ret;
 
 error:
-       PERR("Unable to establish connection\n");
+       BT_LOGE("Unable to establish connection");
        return -1;
 }
 
-static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection)
+static
+int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
 {
        struct hostent *host;
        struct sockaddr_in server_addr;
@@ -264,15 +239,15 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co
                goto error;
        }
 
-       host = gethostbyname(viewer_connection->relay_hostname);
+       host = gethostbyname(viewer_connection->relay_hostname->str);
        if (!host) {
-               PERR("[error] Cannot lookup hostname %s\n",
-                       viewer_connection->relay_hostname);
+               BT_LOGE("Cannot lookup hostname %s",
+                       viewer_connection->relay_hostname->str);
                goto error;
        }
 
-       if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
-               PERR("[error] Socket creation failed: %s\n", strerror(errno));
+       if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
+               BT_LOGE("Socket creation failed: %s", bt_socket_errormsg());
                goto error;
        }
 
@@ -282,8 +257,8 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co
        memset(&(server_addr.sin_zero), 0, 8);
 
        if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
-                               sizeof(struct sockaddr)) == -1) {
-               PERR("[error] Connection failed: %s\n", strerror(errno));
+                               sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
+               BT_LOGE("Connection failed: %s", bt_socket_errormsg());
                goto error;
        }
        if (lttng_live_handshake(viewer_connection)) {
@@ -295,79 +270,80 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co
        return ret;
 
 error:
-       if (viewer_connection->control_sock >= 0) {
-               if (close(viewer_connection->control_sock)) {
-                       PERR("Close: %s", strerror(errno));
+       if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
+               if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
+                       BT_LOGE("Close: %s", bt_socket_errormsg());
                }
        }
-       viewer_connection->control_sock = -1;
+       viewer_connection->control_sock = BT_INVALID_SOCKET;
        return -1;
 }
 
-static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
+static
+void lttng_live_disconnect_viewer(
+               struct live_viewer_connection *viewer_connection)
 {
-       if (viewer_connection->control_sock < 0) {
+       if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
                return;
        }
-       if (close(viewer_connection->control_sock)) {
-               PERR("Close: %s", strerror(errno));
-               viewer_connection->control_sock = -1;
+       if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
+               BT_LOGE("Close: %s", bt_socket_errormsg());
+               viewer_connection->control_sock = BT_INVALID_SOCKET;
        }
 }
 
-static void connection_release(struct bt_object *obj)
+static
+void connection_release(bt_object *obj)
 {
-       struct bt_live_viewer_connection *conn =
-               container_of(obj, struct bt_live_viewer_connection, obj);
+       struct live_viewer_connection *conn =
+               container_of(obj, struct live_viewer_connection, obj);
 
-       bt_live_viewer_connection_destroy(conn);
+       live_viewer_connection_destroy(conn);
 }
 
 static
-enum bt_value_status list_update_session(struct bt_value *results,
+int list_update_session(bt_value *results,
                const struct lttng_viewer_session *session,
                bool *_found)
 {
-       enum bt_value_status ret = BT_VALUE_STATUS_OK;
-       struct bt_value *map = NULL;
-       struct bt_value *hostname = NULL;
-       struct bt_value *session_name = NULL;
-       struct bt_value *btval = NULL;
+       int ret = 0;
+       bt_value *map = NULL;
+       bt_value *hostname = NULL;
+       bt_value *session_name = NULL;
+       bt_value *btval = NULL;
        int i, len;
        bool found = false;
 
-       len = bt_value_array_size(results);
+       len = bt_value_array_get_size(results);
        if (len < 0) {
-               ret = BT_VALUE_STATUS_ERROR;
+               BT_LOGE_STR("Error getting size of array.");
+               ret = -1;
                goto end;
        }
        for (i = 0; i < len; i++) {
                const char *hostname_str = NULL;
                const char *session_name_str = NULL;
 
-               map = bt_value_array_get(results, (size_t) i);
+               map = bt_value_array_borrow_element_by_index(results, (size_t) i);
                if (!map) {
-                       ret = BT_VALUE_STATUS_ERROR;
+                       BT_LOGE_STR("Error borrowing map.");
+                       ret = -1;
                        goto end;
                }
-               hostname = bt_value_map_get(map, "target-hostname");
+               hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
                if (!hostname) {
-                       ret = BT_VALUE_STATUS_ERROR;
+                       BT_LOGE_STR("Error borrowing \"target-hostname\" entry.");
+                       ret = -1;
                        goto end;
                }
-               session_name = bt_value_map_get(map, "session-name");
+               session_name = bt_value_map_borrow_entry_value(map, "session-name");
                if (!session_name) {
-                       ret = BT_VALUE_STATUS_ERROR;
-                       goto end;
-               }
-               ret = bt_value_string_get(hostname, &hostname_str);
-               if (ret != BT_VALUE_STATUS_OK) {
-                       goto end;
-               }
-               ret = bt_value_string_get(session_name, &session_name_str);
-               if (ret != BT_VALUE_STATUS_OK) {
+                       BT_LOGE_STR("Error borrowing \"session-name\" entry.");
+                       ret = -1;
                        goto end;
                }
+               hostname_str = bt_value_string_get(hostname);
+               session_name_str = bt_value_string_get(session_name);
 
                if (!strcmp(session->hostname, hostname_str)
                                && !strcmp(session->session_name,
@@ -378,65 +354,46 @@ enum bt_value_status list_update_session(struct bt_value *results,
 
                        found = true;
 
-                       btval = bt_value_map_get(map, "stream-count");
+                       btval = bt_value_map_borrow_entry_value(map, "stream-count");
                        if (!btval) {
-                               ret = BT_VALUE_STATUS_ERROR;
-                               goto end;
-                       }
-                       ret = bt_value_integer_get(btval, &val);
-                       if (ret != BT_VALUE_STATUS_OK) {
+                               BT_LOGE_STR("Error borrowing \"stream-count\" entry.");
+                               ret = -1;
                                goto end;
                        }
+                       val = bt_value_integer_get(btval);
                        /* sum */
                        val += streams;
-                       ret = bt_value_integer_set(btval, val);
-                       if (ret != BT_VALUE_STATUS_OK) {
-                               goto end;
-                       }
-                       BT_PUT(btval);
+                       bt_value_integer_set(btval, val);
 
-                       btval = bt_value_map_get(map, "client-count");
+                       btval = bt_value_map_borrow_entry_value(map, "client-count");
                        if (!btval) {
-                               ret = BT_VALUE_STATUS_ERROR;
-                               goto end;
-                       }
-                       ret = bt_value_integer_get(btval, &val);
-                       if (ret != BT_VALUE_STATUS_OK) {
+                               BT_LOGE_STR("Error borrowing \"client-count\" entry.");
+                               ret = -1;
                                goto end;
                        }
+                       val = bt_value_integer_get(btval);
                        /* max */
                        val = max_t(int64_t, clients, val);
-                       ret = bt_value_integer_set(btval, val);
-                       if (ret != BT_VALUE_STATUS_OK) {
-                               goto end;
-                       }
-                       BT_PUT(btval);
+                       bt_value_integer_set(btval, val);
                }
 
-               BT_PUT(hostname);
-               BT_PUT(session_name);
-               BT_PUT(map);
-
                if (found) {
                        break;
                }
        }
 end:
-       BT_PUT(btval);
-       BT_PUT(hostname);
-       BT_PUT(session_name);
-       BT_PUT(map);
        *_found = found;
        return ret;
 }
 
 static
-enum bt_value_status list_append_session(struct bt_value *results,
+int list_append_session(bt_value *results,
                GString *base_url,
                const struct lttng_viewer_session *session)
 {
-       enum bt_value_status ret = BT_VALUE_STATUS_OK;
-       struct bt_value *map = NULL;
+       int ret = 0;
+       bt_value_status ret_status;
+       bt_value *map = NULL;
        GString *url = NULL;
        bool found = false;
 
@@ -445,18 +402,20 @@ enum bt_value_status list_append_session(struct bt_value *results,
         * and do max of client counts.
         */
        ret = list_update_session(results, session, &found);
-       if (ret != BT_VALUE_STATUS_OK || found) {
+       if (ret || found) {
                goto end;
        }
 
        map = bt_value_map_create();
        if (!map) {
-               ret = BT_VALUE_STATUS_ERROR;
+               BT_LOGE_STR("Error creating map value.");
+               ret = -1;
                goto end;
        }
 
        if (base_url->len < 1) {
-               ret = BT_VALUE_STATUS_ERROR;
+               BT_LOGE_STR("Error: base_url length smaller than 1.");
+               ret = -1;
                goto end;
        }
        /*
@@ -469,8 +428,10 @@ enum bt_value_status list_append_session(struct bt_value *results,
        g_string_append_c(url, '/');
        g_string_append(url, session->session_name);
 
-       ret = bt_value_map_insert_string(map, "url", url->str);
-       if (ret != BT_VALUE_STATUS_OK) {
+       ret_status = bt_value_map_insert_string_entry(map, "url", url->str);
+       if (ret_status != BT_VALUE_STATUS_OK) {
+               BT_LOGE_STR("Error inserting \"url\" entry.");
+               ret = -1;
                goto end;
        }
 
@@ -478,9 +439,11 @@ enum bt_value_status list_append_session(struct bt_value *results,
         * key = "target-hostname",
         * value = <string>,
         */
-       ret = bt_value_map_insert_string(map, "target-hostname",
+       ret_status = bt_value_map_insert_string_entry(map, "target-hostname",
                session->hostname);
-       if (ret != BT_VALUE_STATUS_OK) {
+       if (ret_status != BT_VALUE_STATUS_OK) {
+               BT_LOGE_STR("Error inserting \"target-hostname\" entry.");
+               ret = -1;
                goto end;
        }
 
@@ -488,9 +451,11 @@ enum bt_value_status list_append_session(struct bt_value *results,
         * key = "session-name",
         * value = <string>,
         */
-       ret = bt_value_map_insert_string(map, "session-name",
+       ret_status = bt_value_map_insert_string_entry(map, "session-name",
                session->session_name);
-       if (ret != BT_VALUE_STATUS_OK) {
+       if (ret_status != BT_VALUE_STATUS_OK) {
+               BT_LOGE_STR("Error inserting \"session-name\" entry.");
+               ret = -1;
                goto end;
        }
 
@@ -501,9 +466,11 @@ enum bt_value_status list_append_session(struct bt_value *results,
        {
                uint32_t live_timer = be32toh(session->live_timer);
 
-               ret = bt_value_map_insert_integer(map, "timer-us",
+               ret_status = bt_value_map_insert_integer_entry(map, "timer-us",
                        live_timer);
-               if (ret != BT_VALUE_STATUS_OK) {
+               if (ret_status != BT_VALUE_STATUS_OK) {
+                       BT_LOGE_STR("Error inserting \"timer-us\" entry.");
+                       ret = -1;
                        goto end;
                }
        }
@@ -515,14 +482,15 @@ enum bt_value_status list_append_session(struct bt_value *results,
        {
                uint32_t streams = be32toh(session->streams);
 
-               ret = bt_value_map_insert_integer(map, "stream-count",
+               ret_status = bt_value_map_insert_integer_entry(map, "stream-count",
                        streams);
-               if (ret != BT_VALUE_STATUS_OK) {
+               if (ret_status != BT_VALUE_STATUS_OK) {
+                       BT_LOGE_STR("Error inserting \"stream-count\" entry.");
+                       ret = -1;
                        goto end;
                }
        }
 
-
        /*
         * key = "client-count",
         * value = <integer>,
@@ -530,19 +498,26 @@ enum bt_value_status list_append_session(struct bt_value *results,
        {
                uint32_t clients = be32toh(session->clients);
 
-               ret = bt_value_map_insert_integer(map, "client-count",
+               ret_status = bt_value_map_insert_integer_entry(map, "client-count",
                        clients);
-               if (ret != BT_VALUE_STATUS_OK) {
+               if (ret_status != BT_VALUE_STATUS_OK) {
+                       BT_LOGE_STR("Error inserting \"client-count\" entry.");
+                       ret = -1;
                        goto end;
                }
        }
 
-       ret = bt_value_array_append(results, map);
+       ret_status = bt_value_array_append_element(results, map);
+       if (ret_status != BT_VALUE_STATUS_OK) {
+               BT_LOGE_STR("Error appending map to results.");
+               ret = -1;
+       }
+
 end:
        if (url) {
-               g_string_free(url, TRUE);
+               g_string_free(url, true);
        }
-       BT_PUT(map);
+       BT_VALUE_PUT_REF_AND_RESET(map);
        return ret;
 }
 
@@ -583,9 +558,12 @@ end:
  */
 
 BT_HIDDEN
-struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
+bt_query_status live_viewer_connection_list_sessions(
+               struct live_viewer_connection *viewer_connection,
+               const bt_value **user_result)
 {
-       struct bt_value *results = NULL;
+       bt_query_status status = BT_QUERY_STATUS_OK;
+       bt_value *result = NULL;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        uint32_t i, sessions_count;
@@ -595,9 +573,10 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
                goto error;
        }
 
-       results = bt_value_array_create();
-       if (!results) {
-               fprintf(stderr, "Error creating array\n");
+       result = bt_value_array_create();
+       if (!result) {
+               BT_LOGE("Error creating array");
+               status = BT_QUERY_STATUS_NOMEM;
                goto error;
        }
 
@@ -605,56 +584,63 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               fprintf(stderr, "Error sending cmd: %s\n", strerror(errno));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
+               status = BT_QUERY_STATUS_ERROR;
                goto error;
        }
-       assert(ret_len == sizeof(cmd));
+       BT_ASSERT(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+       ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
        if (ret_len == 0) {
-               fprintf(stderr, "Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
+               status = BT_QUERY_STATUS_ERROR;
                goto error;
        }
-       if (ret_len < 0) {
-               fprintf(stderr, "Error receiving session list: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
+               status = BT_QUERY_STATUS_ERROR;
                goto error;
        }
-       assert(ret_len == sizeof(list));
+       BT_ASSERT(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
        for (i = 0; i < sessions_count; i++) {
                struct lttng_viewer_session lsession;
 
-               ret_len = lttng_live_recv(viewer_connection->control_sock,
-                               &lsession, sizeof(lsession));
+               ret_len = lttng_live_recv(viewer_connection, &lsession,
+                       sizeof(lsession));
                if (ret_len == 0) {
-                       fprintf(stderr, "Remote side has closed connection\n");
+                       BT_LOGI("Remote side has closed connection");
+                       status = BT_QUERY_STATUS_ERROR;
                        goto error;
                }
-               if (ret_len < 0) {
-                       fprintf(stderr, "Error receiving session: %s\n", strerror(errno));
+               if (ret_len == BT_SOCKET_ERROR) {
+                       BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
+                       status = BT_QUERY_STATUS_ERROR;
                        goto error;
                }
-               assert(ret_len == sizeof(lsession));
+               BT_ASSERT(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
-               if (list_append_session(results,
-                               viewer_connection->url, &lsession)
-                               != BT_VALUE_STATUS_OK) {
+               if (list_append_session(result, viewer_connection->url,
+                               &lsession)) {
+                       status = BT_QUERY_STATUS_ERROR;
                        goto error;
                }
        }
+
+       *user_result = result;
        goto end;
 error:
-       BT_PUT(results);
+       BT_VALUE_PUT_REF_AND_RESET(result);
 end:
-       return results;
+       return status;
 }
 
 static
-int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
+int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
@@ -662,54 +648,59 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
        uint32_t i, sessions_count;
        ssize_t ret_len;
        uint64_t session_id;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(cmd));
+       BT_ASSERT(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+       ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving session list: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(list));
+       BT_ASSERT(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
        for (i = 0; i < sessions_count; i++) {
-               ret_len = lttng_live_recv(viewer_connection->control_sock,
+               ret_len = lttng_live_recv(viewer_connection,
                                &lsession, sizeof(lsession));
                if (ret_len == 0) {
-                       PERR("Remote side has closed connection\n");
+                       BT_LOGI("Remote side has closed connection");
                        goto error;
                }
-               if (ret_len < 0) {
-                       PERR("Error receiving session: %s\n", strerror(errno));
+               if (ret_len == BT_SOCKET_ERROR) {
+                       BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
                        goto error;
                }
-               assert(ret_len == sizeof(lsession));
+               BT_ASSERT(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
                session_id = be64toh(lsession.id);
 
+               BT_LOGD("Adding session %" PRIu64 " hostname: %s session_name: %s",
+                       session_id, lsession.hostname, lsession.session_name);
+
                if ((strncmp(lsession.session_name,
-                       viewer_connection->session_name,
-                       MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
-                               viewer_connection->target_hostname,
-                               MAXNAMLEN) == 0)) {
-                       if (lttng_live_add_session(lttng_live, session_id)) {
+                       viewer_connection->session_name->str,
+                       LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
+                               viewer_connection->target_hostname->str,
+                               LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
+                       if (lttng_live_add_session(lttng_live_msg_iter, session_id,
+                                       lsession.hostname,
+                                       lsession.session_name)) {
                                goto error;
                        }
                }
@@ -718,46 +709,47 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
        return 0;
 
 error:
-       PERR("Unable to query session ids\n");
+       BT_LOGE("Unable to query session ids");
        return -1;
 }
 
 BT_HIDDEN
-int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
+int lttng_live_create_viewer_session(
+               struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_create_session_response resp;
        ssize_t ret_len;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(cmd));
+       BT_ASSERT(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp));
+       ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving create session reply: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(resp));
+       BT_ASSERT(ret_len == sizeof(resp));
 
        if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
-               PERR("Error creating viewer session\n");
+               BT_LOGE("Error creating viewer session");
                goto error;
        }
-       if (lttng_live_query_session_ids(lttng_live)) {
+       if (lttng_live_query_session_ids(lttng_live_msg_iter)) {
                goto error;
        }
 
@@ -773,51 +765,53 @@ int receive_streams(struct lttng_live_session *session,
 {
        ssize_t ret_len;
        uint32_t i;
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+                       session->lttng_live_msg_iter;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
 
-       PDBG("Getting %" PRIu32 " new streams:\n", stream_count);
+       BT_LOGD("Getting %" PRIu32 " new streams:", stream_count);
        for (i = 0; i < stream_count; i++) {
                struct lttng_viewer_stream stream;
                struct lttng_live_stream_iterator *live_stream;
                uint64_t stream_id;
                uint64_t ctf_trace_id;
 
-               ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream));
+               ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
                if (ret_len == 0) {
-                       PERR("Remote side has closed connection\n");
+                       BT_LOGI("Remote side has closed connection");
                        goto error;
                }
-               if (ret_len < 0) {
-                       PERR("Error receiving stream\n");
+               if (ret_len == BT_SOCKET_ERROR) {
+                       BT_LOGE("Error receiving stream");
                        goto error;
                }
-               assert(ret_len == sizeof(stream));
+               BT_ASSERT(ret_len == sizeof(stream));
                stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
                stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
                stream_id = be64toh(stream.id);
                ctf_trace_id = be64toh(stream.ctf_trace_id);
 
                if (stream.metadata_flag) {
-                       PDBG("    metadata stream %" PRIu64 " : %s/%s\n",
+                       BT_LOGD("    metadata stream %" PRIu64 " : %s/%s",
                                        stream_id, stream.path_name,
                                        stream.channel_name);
                        if (lttng_live_metadata_create_stream(session,
-                                       ctf_trace_id, stream_id)) {
-                               PERR("Error creating metadata stream\n");
+                                       ctf_trace_id, stream_id,
+                                       stream.path_name)) {
+                               BT_LOGE("Error creating metadata stream");
 
                                goto error;
                        }
-                       session->lazy_stream_notif_init = true;
+                       session->lazy_stream_msg_init = true;
                } else {
-                       PDBG("    stream %" PRIu64 " : %s/%s\n",
+                       BT_LOGD("    stream %" PRIu64 " : %s/%s",
                                        stream_id, stream.path_name,
                                        stream.channel_name);
                        live_stream = lttng_live_stream_iterator_create(session,
                                ctf_trace_id, stream_id);
                        if (!live_stream) {
-                               PERR("Error creating stream\n");
+                               BT_LOGE("Error creating streamn");
                                goto error;
                        }
                }
@@ -835,15 +829,13 @@ int lttng_live_attach_session(struct lttng_live_session *session)
        struct lttng_viewer_attach_session_request rq;
        struct lttng_viewer_attach_session_response rp;
        ssize_t ret_len;
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        uint64_t session_id = session->id;
        uint32_t streams_count;
-
-       if (session->attached) {
-               return 0;
-       }
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
@@ -855,49 +847,49 @@ int lttng_live_attach_session(struct lttng_live_session *session)
        // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
        rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
-       if (ret_len < 0) {
-               PERR("Error sending attach request: %s\n", strerror(errno));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       BT_ASSERT(ret_len == cmd_buf_len);
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving attach response: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        streams_count = be32toh(rp.streams_count);
        switch(be32toh(rp.status)) {
        case LTTNG_VIEWER_ATTACH_OK:
                break;
        case LTTNG_VIEWER_ATTACH_UNK:
-               PERR("Session id %" PRIu64 " is unknown\n", session_id);
+               BT_LOGW("Session id %" PRIu64 " is unknown", session_id);
                goto error;
        case LTTNG_VIEWER_ATTACH_ALREADY:
-               PERR("There is already a viewer attached to this session\n");
+               BT_LOGW("There is already a viewer attached to this session");
                goto error;
        case LTTNG_VIEWER_ATTACH_NOT_LIVE:
-               PERR("Not a live session\n");
+               BT_LOGW("Not a live session");
                goto error;
        case LTTNG_VIEWER_ATTACH_SEEK_ERR:
-               PERR("Wrong seek parameter\n");
+               BT_LOGE("Wrong seek parameter");
                goto error;
        default:
-               PERR("Unknown attach return code %u\n", be32toh(rp.status));
+               BT_LOGE("Unknown attach return code %u", be32toh(rp.status));
                goto error;
        }
 
@@ -922,10 +914,12 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        struct lttng_viewer_detach_session_request rq;
        struct lttng_viewer_detach_session_response rp;
        ssize_t ret_len;
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        uint64_t session_id = session->id;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (!session->attached) {
                return 0;
@@ -938,42 +932,42 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session_id);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
-       if (ret_len < 0) {
-               PERR("Error sending detach request: %s\n", strerror(errno));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       BT_ASSERT(ret_len == cmd_buf_len);
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving detach response: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        switch(be32toh(rp.status)) {
        case LTTNG_VIEWER_DETACH_SESSION_OK:
                break;
        case LTTNG_VIEWER_DETACH_SESSION_UNK:
-               PERR("Session id %" PRIu64 " is unknown\n", session_id);
+               BT_LOGW("Session id %" PRIu64 " is unknown", session_id);
                goto error;
        case LTTNG_VIEWER_DETACH_SESSION_ERR:
-               PERR("Error detaching session id %" PRIu64 "\n", session_id);
+               BT_LOGW("Error detaching session id %" PRIu64 "", session_id);
                goto error;
        default:
-               PERR("Unknown detach return code %u\n", be32toh(rp.status));
+               BT_LOGE("Unknown detach return code %u", be32toh(rp.status));
                goto error;
        }
 
@@ -997,87 +991,89 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
        char *data = NULL;
        ssize_t ret_len;
        struct lttng_live_session *session = trace->session;
-       struct lttng_live_component *lttng_live = session->lttng_live;
+       struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
        struct lttng_live_metadata *metadata = trace->metadata;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        rq.stream_id = htobe64(metadata->stream_id);
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
-       if (ret_len < 0) {
-               PERR("Error sending get_metadata request: %s\n", strerror(errno));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       BT_ASSERT(ret_len == cmd_buf_len);
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving get_metadata response: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        switch (be32toh(rp.status)) {
                case LTTNG_VIEWER_METADATA_OK:
-                       PDBG("get_metadata : OK\n");
+                       BT_LOGD("get_metadata : OK");
                        break;
                case LTTNG_VIEWER_NO_NEW_METADATA:
-                       PDBG("get_metadata : NO NEW\n");
+                       BT_LOGD("get_metadata : NO NEW");
                        ret = 0;
                        goto end;
                case LTTNG_VIEWER_METADATA_ERR:
-                       PDBG("get_metadata : ERR\n");
+                       BT_LOGD("get_metadata : ERR");
                        goto error;
                default:
-                       PDBG("get_metadata : UNKNOWN\n");
+                       BT_LOGD("get_metadata : UNKNOWN");
                        goto error;
        }
 
        len = be64toh(rp.len);
-       PDBG("Writing %" PRIu64" bytes to metadata\n", len);
+       BT_LOGD("Writing %" PRIu64" bytes to metadata", len);
        if (len <= 0) {
                goto error;
        }
 
        data = zmalloc(len);
        if (!data) {
-               PERR("relay data zmalloc: %s", strerror(errno));
+               BT_LOGE("relay data zmalloc: %s", strerror(errno));
                goto error;
        }
-       ret_len = lttng_live_recv(viewer_connection->control_sock, data, len);
+       ret_len = lttng_live_recv(viewer_connection, data, len);
        if (ret_len == 0) {
-               PERR("[error] Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error_free_data;
        }
-       if (ret_len < 0) {
-               PERR("[error] Error receiving trace packet: %s", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
                goto error_free_data;
        }
-       assert(ret_len == len);
+       BT_ASSERT(ret_len == len);
 
        do {
                ret_len = fwrite(data, 1, len, fp);
        } while (ret_len < 0 && errno == EINTR);
        if (ret_len < 0) {
-               PERR("[error] Writing in the metadata fp\n");
+               BT_LOGE("Writing in the metadata fp");
                goto error_free_data;
        }
-       assert(ret_len == len);
+       BT_ASSERT(ret_len == len);
        free(data);
        ret = len;
 end:
@@ -1096,8 +1092,8 @@ static
 void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
                struct packet_index *pindex)
 {
-       assert(lindex);
-       assert(pindex);
+       BT_ASSERT(lindex);
+       BT_ASSERT(pindex);
 
        pindex->offset = be64toh(lindex->offset);
        pindex->packet_size = be64toh(lindex->packet_size);
@@ -1108,7 +1104,8 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
 }
 
 BT_HIDDEN
-enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_get_next_index(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
                struct lttng_live_stream_iterator *stream,
                struct packet_index *index)
 {
@@ -1117,43 +1114,50 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li
        ssize_t ret_len;
        struct lttng_viewer_index rp;
        uint32_t flags, status;
-       enum bt_ctf_lttng_live_iterator_status retstatus =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       enum lttng_live_iterator_status retstatus =
+                       LTTNG_LIVE_ITERATOR_STATUS_OK;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
+       struct lttng_live_component *lttng_live =
+               lttng_live_msg_iter->lttng_live_comp;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
+
        memset(&rq, 0, sizeof(rq));
        rq.stream_id = htobe64(stream->viewer_stream_id);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
-       if (ret_len < 0) {
-               PERR("Error sending get_next_index request: %s\n", strerror(errno));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending get_next_index request: %s",
+                               bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       BT_ASSERT(ret_len == cmd_buf_len);
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving get_next_index response: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving get_next_index response: %s",
+                               bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        flags = be32toh(rp.flags);
        status = be32toh(rp.status);
@@ -1163,13 +1167,13 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li
        {
                uint64_t ctf_stream_class_id;
 
-               PDBG("get_next_index: inactive\n");
+               BT_LOGD("get_next_index: inactive");
                memset(index, 0, sizeof(struct packet_index));
                index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
-               stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
+               stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
                ctf_stream_class_id = be64toh(rp.stream_id);
                if (stream->ctf_stream_class_id != -1ULL) {
-                       assert(stream->ctf_stream_class_id ==
+                       BT_ASSERT(stream->ctf_stream_class_id ==
                                ctf_stream_class_id);
                } else {
                        stream->ctf_stream_class_id = ctf_stream_class_id;
@@ -1181,50 +1185,48 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li
        {
                uint64_t ctf_stream_class_id;
 
-               PDBG("get_next_index: OK\n");
+               BT_LOGD("get_next_index: OK");
                lttng_index_to_packet_index(&rp, index);
                ctf_stream_class_id = be64toh(rp.stream_id);
                if (stream->ctf_stream_class_id != -1ULL) {
-                       assert(stream->ctf_stream_class_id ==
+                       BT_ASSERT(stream->ctf_stream_class_id ==
                                ctf_stream_class_id);
                } else {
                        stream->ctf_stream_class_id = ctf_stream_class_id;
                }
 
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
-               stream->current_packet_end_timestamp =
-                       index->ts_cycles.timestamp_end;
 
                if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
-                       PDBG("get_next_index: new metadata needed\n");
+                       BT_LOGD("get_next_index: new metadata needed");
                        trace->new_metadata_needed = true;
                }
                if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
-                       PDBG("get_next_index: new streams needed\n");
-                       lttng_live_need_new_streams(lttng_live);
+                       BT_LOGD("get_next_index: new streams needed");
+                       lttng_live_need_new_streams(lttng_live_msg_iter);
                }
                break;
        }
        case LTTNG_VIEWER_INDEX_RETRY:
-               PDBG("get_next_index: retry\n");
+               BT_LOGD("get_next_index: retry");
                memset(index, 0, sizeof(struct packet_index));
-               retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
                goto end;
        case LTTNG_VIEWER_INDEX_HUP:
-               PDBG("get_next_index: stream hung up\n");
+               BT_LOGD("get_next_index: stream hung up");
                memset(index, 0, sizeof(struct packet_index));
                index->offset = EOF;
-               retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+               retstatus = LTTNG_LIVE_ITERATOR_STATUS_END;
                stream->state = LTTNG_LIVE_STREAM_EOF;
                break;
        case LTTNG_VIEWER_INDEX_ERR:
-               PERR("get_next_index: error\n");
+               BT_LOGE("get_next_index: error");
                memset(index, 0, sizeof(struct packet_index));
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
                goto error;
        default:
-               PERR("get_next_index: unkwown value\n");
+               BT_LOGE("get_next_index: unknown value");
                memset(index, 0, sizeof(struct packet_index));
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
                goto error;
@@ -1233,26 +1235,35 @@ end:
        return retstatus;
 
 error:
-       retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       if (lttng_live_is_canceled(lttng_live)) {
+               retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+       } else {
+               retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       }
        return retstatus;
 }
 
 BT_HIDDEN
-enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
-               struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
-               uint64_t req_len, uint64_t *recv_len)
+enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct lttng_live_stream_iterator *stream, uint8_t *buf,
+               uint64_t offset, uint64_t req_len, uint64_t *recv_len)
 {
-       enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK;
+       enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_get_packet rq;
        struct lttng_viewer_trace_packet rp;
        ssize_t ret_len;
        uint32_t flags, status;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct live_viewer_connection *viewer_connection =
+                       lttng_live_msg_iter->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
+       struct lttng_live_component *lttng_live =
+               lttng_live_msg_iter->lttng_live_comp;
 
-       PDBG("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64 "\n",
+       BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
                        offset, req_len);
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
@@ -1263,32 +1274,32 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
        rq.offset = htobe64(offset);
        rq.len = htobe32(req_len);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
-       if (ret_len < 0) {
-               PERR("Error sending get_data request: %s\n", strerror(errno));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       BT_ASSERT(ret_len == cmd_buf_len);
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving get_data response: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving get_data response: %s", bt_socket_errormsg());
                goto error;
        }
        if (ret_len != sizeof(rp)) {
-               PERR("[error] get_data_packet: expected %zu"
-                               ", received %zd\n", sizeof(rp),
+               BT_LOGE("get_data_packet: expected %zu"
+                               ", received %zd", sizeof(rp),
                                ret_len);
                goto error;
        }
@@ -1299,34 +1310,34 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
        switch (status) {
        case LTTNG_VIEWER_GET_PACKET_OK:
                req_len = be32toh(rp.len);
-               PDBG("get_data_packet: Ok, packet size : %" PRIu64 "\n", req_len);
+               BT_LOGD("get_data_packet: Ok, packet size : %" PRIu64 "", req_len);
                break;
        case LTTNG_VIEWER_GET_PACKET_RETRY:
                /* Unimplemented by relay daemon */
-               PDBG("get_data_packet: retry\n");
-               retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+               BT_LOGD("get_data_packet: retry");
+               retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
                goto end;
        case LTTNG_VIEWER_GET_PACKET_ERR:
                if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
-                       PDBG("get_data_packet: new metadata needed, try again later\n");
+                       BT_LOGD("get_data_packet: new metadata needed, try again later");
                        trace->new_metadata_needed = true;
                }
                if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
-                       PDBG("get_data_packet: new streams needed, try again later\n");
-                       lttng_live_need_new_streams(lttng_live);
+                       BT_LOGD("get_data_packet: new streams needed, try again later");
+                       lttng_live_need_new_streams(lttng_live_msg_iter);
                }
                if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
                                | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
-                       retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+                       retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
                        goto end;
                }
-               PERR("get_data_packet: error\n");
+               BT_LOGE("get_data_packet: error");
                goto error;
        case LTTNG_VIEWER_GET_PACKET_EOF:
-               retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF;
+               retstatus = BT_MSG_ITER_MEDIUM_STATUS_EOF;
                goto end;
        default:
-               PDBG("get_data_packet: unknown\n");
+               BT_LOGE("get_data_packet: unknown");
                goto error;
        }
 
@@ -1334,22 +1345,26 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
                goto error;
        }
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len);
+       ret_len = lttng_live_recv(viewer_connection, buf, req_len);
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving trace packet: %s\n", strerror(errno));
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == req_len);
+       BT_ASSERT(ret_len == req_len);
        *recv_len = ret_len;
 end:
        return retstatus;
 
 error:
-       retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+       if (lttng_live_is_canceled(lttng_live)) {
+               retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
+       } else {
+               retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR;
+       }
        return retstatus;
 }
 
@@ -1357,22 +1372,27 @@ error:
  * Request new streams for a session.
  */
 BT_HIDDEN
-enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
+enum lttng_live_iterator_status lttng_live_get_new_streams(
                struct lttng_live_session *session)
 {
-       enum bt_ctf_lttng_live_iterator_status status =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status status =
+               LTTNG_LIVE_ITERATOR_STATUS_OK;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_new_streams_request rq;
        struct lttng_viewer_new_streams_response rp;
        ssize_t ret_len;
-       struct lttng_live_component *lttng_live = session->lttng_live;
-       struct bt_live_viewer_connection *viewer_connection =
-                       lttng_live->viewer_connection;
+       struct lttng_live_msg_iter *lttng_live_msg_iter =
+               session->lttng_live_msg_iter;
+       struct live_viewer_connection *viewer_connection =
+               lttng_live_msg_iter->viewer_connection;
+       struct lttng_live_component *lttng_live =
+               lttng_live_msg_iter->lttng_live_comp;
        uint32_t streams_count;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (!session->new_streams_needed) {
-               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               return LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
@@ -1382,30 +1402,31 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session->id);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               PERR("Error sending cmd: %s\n", strerror(errno));
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
-       if (ret_len < 0) {
-               PERR("Error sending get_new_streams request: %s\n", strerror(errno));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error sending get_new_streams request: %s",
+                               bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       BT_ASSERT(ret_len == cmd_buf_len);
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
-               PERR("Remote side has closed connection\n");
+               BT_LOGI("Remote side has closed connection");
                goto error;
        }
-       if (ret_len < 0) {
-               PERR("Error receiving get_new_streams response\n");
+       if (ret_len == BT_SOCKET_ERROR) {
+               BT_LOGE("Error receiving get_new_streams response");
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        streams_count = be32toh(rp.streams_count);
 
@@ -1419,13 +1440,13 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
        case LTTNG_VIEWER_NEW_STREAMS_HUP:
                session->new_streams_needed = false;
                session->closed = true;
-               status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+               status = LTTNG_LIVE_ITERATOR_STATUS_END;
                goto end;
        case LTTNG_VIEWER_NEW_STREAMS_ERR:
-               PERR("get_new_streams error\n");
+               BT_LOGE("get_new_streams error");
                goto error;
        default:
-               PERR("Unknown return code %u\n", be32toh(rp.status));
+               BT_LOGE("Unknown return code %u", be32toh(rp.status));
                goto error;
        }
 
@@ -1436,46 +1457,68 @@ end:
        return status;
 
 error:
-       status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       if (lttng_live_is_canceled(lttng_live)) {
+               status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+       } else {
+               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       }
        return status;
 }
 
 BT_HIDDEN
-struct bt_live_viewer_connection *
-       bt_live_viewer_connection_create(const char *url, FILE *error_fp)
+struct live_viewer_connection *live_viewer_connection_create(
+               const char *url, bool in_query,
+               struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       struct bt_live_viewer_connection *viewer_connection;
+       struct live_viewer_connection *viewer_connection;
+
+       viewer_connection = g_new0(struct live_viewer_connection, 1);
 
-       viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
+       if (bt_socket_init() != 0) {
+               goto error;
+       }
 
-       bt_object_init(&viewer_connection->obj, connection_release);
-       viewer_connection->control_sock = -1;
+       bt_object_init_shared(&viewer_connection->obj, connection_release);
+       viewer_connection->control_sock = BT_INVALID_SOCKET;
        viewer_connection->port = -1;
-       viewer_connection->error_fp = error_fp;
+       viewer_connection->in_query = in_query;
+       viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
        viewer_connection->url = g_string_new(url);
        if (!viewer_connection->url) {
                goto error;
        }
 
-       PDBG("Establishing connection to url \"%s\"...\n", url);
+       BT_LOGD("Establishing connection to url \"%s\"...", url);
        if (lttng_live_connect_viewer(viewer_connection)) {
                goto error_report;
        }
-       PDBG("Connection to url \"%s\" is established\n", url);
+       BT_LOGD("Connection to url \"%s\" is established", url);
        return viewer_connection;
 
 error_report:
-       printf_verbose("Failure to establish connection to url \"%s\"\n", url);
+       BT_LOGW("Failure to establish connection to url \"%s\"", url);
 error:
        g_free(viewer_connection);
        return NULL;
 }
 
 BT_HIDDEN
-void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
+void live_viewer_connection_destroy(
+               struct live_viewer_connection *viewer_connection)
 {
-       PDBG("Closing connection to url \"%s\"\n", viewer_connection->url->str);
+       BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str);
        lttng_live_disconnect_viewer(viewer_connection);
-       g_string_free(viewer_connection->url, TRUE);
+       g_string_free(viewer_connection->url, true);
+       if (viewer_connection->relay_hostname) {
+               g_string_free(viewer_connection->relay_hostname, true);
+       }
+       if (viewer_connection->target_hostname) {
+               g_string_free(viewer_connection->target_hostname, true);
+       }
+       if (viewer_connection->session_name) {
+               g_string_free(viewer_connection->session_name, true);
+       }
        g_free(viewer_connection);
+
+       bt_socket_fini();
 }
This page took 0.051709 seconds and 4 git commands to generate.