lib: rename "notification" -> "message"
[babeltrace.git] / plugins / ctf / lttng-live / viewer-connection.c
index e66bd1e8aa9c0929b80b97476956af02ebf5a909..b40289780f8a6adb9a9a5a38278ec814ddc74297 100644 (file)
@@ -57,7 +57,7 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti
        do {
                ret = bt_socket_recv(sock, buf + copied, to_copy, 0);
                if (ret > 0) {
-                       assert(ret <= to_copy);
+                       BT_ASSERT(ret <= to_copy);
                        copied += ret;
                        to_copy -= ret;
                }
@@ -157,6 +157,8 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_connect connect;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+       char cmd_buf[cmd_buf_len];
        int ret;
        ssize_t ret_len;
 
@@ -169,19 +171,20 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
        connect.minor = htobe32(LTTNG_LIVE_MINOR);
        connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending version: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(connect));
+
+       BT_ASSERT(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
        if (ret_len == 0) {
@@ -192,7 +195,7 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
                BT_LOGE("Error receiving version: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(connect));
+       BT_ASSERT(ret_len == sizeof(connect));
 
        BT_LOGD("Received viewer session ID : %" PRIu64,
                        (uint64_t) be64toh(connect.viewer_session_id));
@@ -279,7 +282,7 @@ static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewe
        }
 }
 
-static void connection_release(struct bt_object *obj)
+static void connection_release(bt_object *obj)
 {
        struct bt_live_viewer_connection *conn =
                container_of(obj, struct bt_live_viewer_connection, obj);
@@ -288,19 +291,19 @@ static void connection_release(struct bt_object *obj)
 }
 
 static
-enum bt_value_status list_update_session(struct bt_value *results,
+enum bt_value_status list_update_session(bt_value *results,
                const struct lttng_viewer_session *session,
                bool *_found)
 {
        enum bt_value_status ret = BT_VALUE_STATUS_OK;
-       struct bt_value *map = NULL;
-       struct bt_value *hostname = NULL;
-       struct bt_value *session_name = NULL;
-       struct bt_value *btval = NULL;
+       bt_value *map = NULL;
+       bt_value *hostname = NULL;
+       bt_value *session_name = NULL;
+       bt_value *btval = NULL;
        int i, len;
        bool found = false;
 
-       len = bt_value_array_size(results);
+       len = bt_value_array_get_size(results);
        if (len < 0) {
                ret = BT_VALUE_STATUS_ERROR;
                goto end;
@@ -324,14 +327,8 @@ enum bt_value_status list_update_session(struct bt_value *results,
                        ret = BT_VALUE_STATUS_ERROR;
                        goto end;
                }
-               ret = bt_value_string_get(hostname, &hostname_str);
-               if (ret != BT_VALUE_STATUS_OK) {
-                       goto end;
-               }
-               ret = bt_value_string_get(session_name, &session_name_str);
-               if (ret != BT_VALUE_STATUS_OK) {
-                       goto end;
-               }
+               hostname_str = bt_value_string_get(hostname);
+               session_name_str = bt_value_string_get(session_name);
 
                if (!strcmp(session->hostname, hostname_str)
                                && !strcmp(session->session_name,
@@ -347,60 +344,54 @@ enum bt_value_status list_update_session(struct bt_value *results,
                                ret = BT_VALUE_STATUS_ERROR;
                                goto end;
                        }
-                       ret = bt_value_integer_get(btval, &val);
-                       if (ret != BT_VALUE_STATUS_OK) {
-                               goto end;
-                       }
+                       val = bt_value_integer_get(btval);
                        /* sum */
                        val += streams;
-                       ret = bt_value_integer_set(btval, val);
+                       ret = bt_private_integer_bool_set(btval, val);
                        if (ret != BT_VALUE_STATUS_OK) {
                                goto end;
                        }
-                       BT_PUT(btval);
+                       BT_VALUE_PUT_REF_AND_RESET(btval);
 
                        btval = bt_value_map_get(map, "client-count");
                        if (!btval) {
                                ret = BT_VALUE_STATUS_ERROR;
                                goto end;
                        }
-                       ret = bt_value_integer_get(btval, &val);
-                       if (ret != BT_VALUE_STATUS_OK) {
-                               goto end;
-                       }
+                       val = bt_value_integer_get(btval);
                        /* max */
                        val = max_t(int64_t, clients, val);
-                       ret = bt_value_integer_set(btval, val);
+                       ret = bt_private_integer_bool_set(btval, val);
                        if (ret != BT_VALUE_STATUS_OK) {
                                goto end;
                        }
-                       BT_PUT(btval);
+                       BT_VALUE_PUT_REF_AND_RESET(btval);
                }
 
-               BT_PUT(hostname);
-               BT_PUT(session_name);
-               BT_PUT(map);
+               BT_VALUE_PUT_REF_AND_RESET(hostname);
+               BT_VALUE_PUT_REF_AND_RESET(session_name);
+               BT_VALUE_PUT_REF_AND_RESET(map);
 
                if (found) {
                        break;
                }
        }
 end:
-       BT_PUT(btval);
-       BT_PUT(hostname);
-       BT_PUT(session_name);
-       BT_PUT(map);
+       BT_VALUE_PUT_REF_AND_RESET(btval);
+       BT_VALUE_PUT_REF_AND_RESET(hostname);
+       BT_VALUE_PUT_REF_AND_RESET(session_name);
+       BT_VALUE_PUT_REF_AND_RESET(map);
        *_found = found;
        return ret;
 }
 
 static
-enum bt_value_status list_append_session(struct bt_value *results,
+enum bt_value_status list_append_session(bt_value *results,
                GString *base_url,
                const struct lttng_viewer_session *session)
 {
        enum bt_value_status ret = BT_VALUE_STATUS_OK;
-       struct bt_value *map = NULL;
+       bt_value *map = NULL;
        GString *url = NULL;
        bool found = false;
 
@@ -413,7 +404,7 @@ enum bt_value_status list_append_session(struct bt_value *results,
                goto end;
        }
 
-       map = bt_value_map_create();
+       map = bt_private_value_map_create();
        if (!map) {
                ret = BT_VALUE_STATUS_ERROR;
                goto end;
@@ -433,7 +424,7 @@ enum bt_value_status list_append_session(struct bt_value *results,
        g_string_append_c(url, '/');
        g_string_append(url, session->session_name);
 
-       ret = bt_value_map_insert_string(map, "url", url->str);
+       ret = bt_private_value_map_insert_string_entry(map, "url", url->str);
        if (ret != BT_VALUE_STATUS_OK) {
                goto end;
        }
@@ -442,7 +433,7 @@ enum bt_value_status list_append_session(struct bt_value *results,
         * key = "target-hostname",
         * value = <string>,
         */
-       ret = bt_value_map_insert_string(map, "target-hostname",
+       ret = bt_private_value_map_insert_string_entry(map, "target-hostname",
                session->hostname);
        if (ret != BT_VALUE_STATUS_OK) {
                goto end;
@@ -452,7 +443,7 @@ enum bt_value_status list_append_session(struct bt_value *results,
         * key = "session-name",
         * value = <string>,
         */
-       ret = bt_value_map_insert_string(map, "session-name",
+       ret = bt_private_value_map_insert_string_entry(map, "session-name",
                session->session_name);
        if (ret != BT_VALUE_STATUS_OK) {
                goto end;
@@ -465,7 +456,7 @@ enum bt_value_status list_append_session(struct bt_value *results,
        {
                uint32_t live_timer = be32toh(session->live_timer);
 
-               ret = bt_value_map_insert_integer(map, "timer-us",
+               ret = bt_private_value_map_insert_integer_entry(map, "timer-us",
                        live_timer);
                if (ret != BT_VALUE_STATUS_OK) {
                        goto end;
@@ -479,7 +470,7 @@ enum bt_value_status list_append_session(struct bt_value *results,
        {
                uint32_t streams = be32toh(session->streams);
 
-               ret = bt_value_map_insert_integer(map, "stream-count",
+               ret = bt_private_value_map_insert_integer_entry(map, "stream-count",
                        streams);
                if (ret != BT_VALUE_STATUS_OK) {
                        goto end;
@@ -494,19 +485,19 @@ enum bt_value_status list_append_session(struct bt_value *results,
        {
                uint32_t clients = be32toh(session->clients);
 
-               ret = bt_value_map_insert_integer(map, "client-count",
+               ret = bt_private_value_map_insert_integer_entry(map, "client-count",
                        clients);
                if (ret != BT_VALUE_STATUS_OK) {
                        goto end;
                }
        }
 
-       ret = bt_value_array_append(results, map);
+       ret = bt_private_value_array_append_element(results, map);
 end:
        if (url) {
                g_string_free(url, TRUE);
        }
-       BT_PUT(map);
+       BT_VALUE_PUT_REF_AND_RESET(map);
        return ret;
 }
 
@@ -547,9 +538,9 @@ end:
  */
 
 BT_HIDDEN
-struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
+bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
 {
-       struct bt_value *results = NULL;
+       bt_value *results = NULL;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        uint32_t i, sessions_count;
@@ -559,7 +550,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
                goto error;
        }
 
-       results = bt_value_array_create();
+       results = bt_private_value_array_create();
        if (!results) {
                BT_LOGE("Error creating array");
                goto error;
@@ -574,7 +565,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
                BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(cmd));
+       BT_ASSERT(ret_len == sizeof(cmd));
 
        ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
        if (ret_len == 0) {
@@ -585,7 +576,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
                BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(list));
+       BT_ASSERT(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
        for (i = 0; i < sessions_count; i++) {
@@ -601,7 +592,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
                        BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
                        goto error;
                }
-               assert(ret_len == sizeof(lsession));
+               BT_ASSERT(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
                if (list_append_session(results,
@@ -612,7 +603,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
        }
        goto end;
 error:
-       BT_PUT(results);
+       BT_VALUE_PUT_REF_AND_RESET(results);
 end:
        return results;
 }
@@ -638,7 +629,7 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
                BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(cmd));
+       BT_ASSERT(ret_len == sizeof(cmd));
 
        ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
        if (ret_len == 0) {
@@ -649,7 +640,7 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
                BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(list));
+       BT_ASSERT(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
        for (i = 0; i < sessions_count; i++) {
@@ -663,7 +654,7 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
                        BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
                        goto error;
                }
-               assert(ret_len == sizeof(lsession));
+               BT_ASSERT(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
                session_id = be64toh(lsession.id);
@@ -709,7 +700,7 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
                BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(cmd));
+       BT_ASSERT(ret_len == sizeof(cmd));
 
        ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
        if (ret_len == 0) {
@@ -720,7 +711,7 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
                BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(resp));
+       BT_ASSERT(ret_len == sizeof(resp));
 
        if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
                BT_LOGE("Error creating viewer session");
@@ -762,7 +753,7 @@ int receive_streams(struct lttng_live_session *session,
                        BT_LOGE("Error receiving stream");
                        goto error;
                }
-               assert(ret_len == sizeof(stream));
+               BT_ASSERT(ret_len == sizeof(stream));
                stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
                stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
                stream_id = be64toh(stream.id);
@@ -779,7 +770,7 @@ int receive_streams(struct lttng_live_session *session,
 
                                goto error;
                        }
-                       session->lazy_stream_notif_init = true;
+                       session->lazy_stream_msg_init = true;
                } else {
                        BT_LOGD("    stream %" PRIu64 " : %s/%s",
                                        stream_id, stream.path_name,
@@ -810,6 +801,8 @@ int lttng_live_attach_session(struct lttng_live_session *session)
                        lttng_live->viewer_connection;
        uint64_t session_id = session->id;
        uint32_t streams_count;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (session->attached) {
                return 0;
@@ -825,20 +818,20 @@ int lttng_live_attach_session(struct lttng_live_session *session)
        // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
        rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
+       BT_ASSERT(ret_len == cmd_buf_len);
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
@@ -848,7 +841,7 @@ int lttng_live_attach_session(struct lttng_live_session *session)
                BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        streams_count = be32toh(rp.streams_count);
        switch(be32toh(rp.status)) {
@@ -896,6 +889,8 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        uint64_t session_id = session->id;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (!session->attached) {
                return 0;
@@ -908,20 +903,20 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session_id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
+       BT_ASSERT(ret_len == cmd_buf_len);
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
@@ -931,7 +926,7 @@ int lttng_live_detach_session(struct lttng_live_session *session)
                BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        switch(be32toh(rp.status)) {
        case LTTNG_VIEWER_DETACH_SESSION_OK:
@@ -971,26 +966,28 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
        struct lttng_live_metadata *metadata = trace->metadata;
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        rq.stream_id = htobe64(metadata->stream_id);
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
+       BT_ASSERT(ret_len == cmd_buf_len);
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
@@ -1000,7 +997,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
                BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        switch (be32toh(rp.status)) {
                case LTTNG_VIEWER_METADATA_OK:
@@ -1038,7 +1035,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
                BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
                goto error_free_data;
        }
-       assert(ret_len == len);
+       BT_ASSERT(ret_len == len);
 
        do {
                ret_len = fwrite(data, 1, len, fp);
@@ -1047,7 +1044,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
                BT_LOGE("Writing in the metadata fp");
                goto error_free_data;
        }
-       assert(ret_len == len);
+       BT_ASSERT(ret_len == len);
        free(data);
        ret = len;
 end:
@@ -1066,8 +1063,8 @@ static
 void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
                struct packet_index *pindex)
 {
-       assert(lindex);
-       assert(pindex);
+       BT_ASSERT(lindex);
+       BT_ASSERT(pindex);
 
        pindex->offset = be64toh(lindex->offset);
        pindex->packet_size = be64toh(lindex->packet_size);
@@ -1092,28 +1089,31 @@ enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_c
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
+
        memset(&rq, 0, sizeof(rq));
        rq.stream_id = htobe64(stream->viewer_stream_id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
+       BT_ASSERT(ret_len == cmd_buf_len);
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
@@ -1123,7 +1123,7 @@ enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_c
                BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        flags = be32toh(rp.flags);
        status = be32toh(rp.status);
@@ -1139,7 +1139,7 @@ enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_c
                stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
                ctf_stream_class_id = be64toh(rp.stream_id);
                if (stream->ctf_stream_class_id != -1ULL) {
-                       assert(stream->ctf_stream_class_id ==
+                       BT_ASSERT(stream->ctf_stream_class_id ==
                                ctf_stream_class_id);
                } else {
                        stream->ctf_stream_class_id = ctf_stream_class_id;
@@ -1155,7 +1155,7 @@ enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_c
                lttng_index_to_packet_index(&rp, index);
                ctf_stream_class_id = be64toh(rp.stream_id);
                if (stream->ctf_stream_class_id != -1ULL) {
-                       assert(stream->ctf_stream_class_id ==
+                       BT_ASSERT(stream->ctf_stream_class_id ==
                                ctf_stream_class_id);
                } else {
                        stream->ctf_stream_class_id = ctf_stream_class_id;
@@ -1212,11 +1212,11 @@ error:
 }
 
 BT_HIDDEN
-enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
+enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
                struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
                uint64_t req_len, uint64_t *recv_len)
 {
-       enum bt_notif_iter_medium_status retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_OK;
+       enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_get_packet rq;
        struct lttng_viewer_trace_packet rp;
@@ -1225,6 +1225,8 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
                        offset, req_len);
@@ -1237,20 +1239,20 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c
        rq.offset = htobe64(offset);
        rq.len = htobe32(req_len);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
+       BT_ASSERT(ret_len == cmd_buf_len);
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
@@ -1278,7 +1280,7 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c
        case LTTNG_VIEWER_GET_PACKET_RETRY:
                /* Unimplemented by relay daemon */
                BT_LOGD("get_data_packet: retry");
-               retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+               retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
                goto end;
        case LTTNG_VIEWER_GET_PACKET_ERR:
                if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
@@ -1291,13 +1293,13 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c
                }
                if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
                                | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
-                       retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+                       retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
                        goto end;
                }
                BT_LOGE("get_data_packet: error");
                goto error;
        case LTTNG_VIEWER_GET_PACKET_EOF:
-               retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_EOF;
+               retstatus = BT_MSG_ITER_MEDIUM_STATUS_EOF;
                goto end;
        default:
                BT_LOGE("get_data_packet: unknown");
@@ -1317,16 +1319,16 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c
                BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == req_len);
+       BT_ASSERT(ret_len == req_len);
        *recv_len = ret_len;
 end:
        return retstatus;
 
 error:
        if (lttng_live_is_canceled(lttng_live)) {
-               retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+               retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
        } else {
-               retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+               retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR;
        }
        return retstatus;
 }
@@ -1348,6 +1350,8 @@ enum bt_lttng_live_iterator_status lttng_live_get_new_streams(
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        uint32_t streams_count;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (!session->new_streams_needed) {
                return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
@@ -1360,20 +1364,20 @@ enum bt_lttng_live_iterator_status lttng_live_get_new_streams(
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session->id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
 
+       BT_ASSERT(ret_len == cmd_buf_len);
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
@@ -1383,7 +1387,7 @@ enum bt_lttng_live_iterator_status lttng_live_get_new_streams(
                BT_LOGE("Error receiving get_new_streams response");
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       BT_ASSERT(ret_len == sizeof(rp));
 
        streams_count = be32toh(rp.streams_count);
 
This page took 0.036833 seconds and 4 git commands to generate.