Add internal BT_ASSERT() and BT_ASSERT_PRE() helpers
[babeltrace.git] / plugins / ctf / lttng-live / viewer-connection.c
index 55bc6bb0ecb9068f0cd2dc21b4f9e2b54dacc954..93ad97a5903427a6593e2ef9715c6f601df588a8 100644 (file)
@@ -157,6 +157,8 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_connect connect;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+       char cmd_buf[cmd_buf_len];
        int ret;
        ssize_t ret_len;
 
@@ -169,19 +171,20 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
        connect.minor = htobe32(LTTNG_LIVE_MINOR);
        connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
 
-       ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending version: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(connect));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
        if (ret_len == 0) {
@@ -810,6 +813,8 @@ int lttng_live_attach_session(struct lttng_live_session *session)
                        lttng_live->viewer_connection;
        uint64_t session_id = session->id;
        uint32_t streams_count;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (session->attached) {
                return 0;
@@ -825,19 +830,20 @@ int lttng_live_attach_session(struct lttng_live_session *session)
        // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
        rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -896,6 +902,8 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        uint64_t session_id = session->id;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (!session->attached) {
                return 0;
@@ -908,19 +916,20 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session_id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -971,25 +980,28 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
        struct lttng_live_metadata *metadata = trace->metadata;
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        rq.stream_id = htobe64(metadata->stream_id);
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -1078,7 +1090,7 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
 }
 
 BT_HIDDEN
-enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
+enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
                struct lttng_live_stream_iterator *stream,
                struct packet_index *index)
 {
@@ -1087,32 +1099,36 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li
        ssize_t ret_len;
        struct lttng_viewer_index rp;
        uint32_t flags, status;
-       enum bt_ctf_lttng_live_iterator_status retstatus =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum bt_lttng_live_iterator_status retstatus =
+                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
+
        memset(&rq, 0, sizeof(rq));
        rq.stream_id = htobe64(stream->viewer_stream_id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -1178,14 +1194,14 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li
        case LTTNG_VIEWER_INDEX_RETRY:
                BT_LOGD("get_next_index: retry");
                memset(index, 0, sizeof(struct packet_index));
-               retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
                goto end;
        case LTTNG_VIEWER_INDEX_HUP:
                BT_LOGD("get_next_index: stream hung up");
                memset(index, 0, sizeof(struct packet_index));
                index->offset = EOF;
-               retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+               retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
                stream->state = LTTNG_LIVE_STREAM_EOF;
                break;
        case LTTNG_VIEWER_INDEX_ERR:
@@ -1204,19 +1220,19 @@ end:
 
 error:
        if (lttng_live_is_canceled(lttng_live)) {
-               retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
        } else {
-               retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
        }
        return retstatus;
 }
 
 BT_HIDDEN
-enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
+enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
                struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
                uint64_t req_len, uint64_t *recv_len)
 {
-       enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK;
+       enum bt_notif_iter_medium_status retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_OK;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_get_packet rq;
        struct lttng_viewer_trace_packet rp;
@@ -1225,6 +1241,8 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
                        offset, req_len);
@@ -1237,19 +1255,20 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
        rq.offset = htobe64(offset);
        rq.len = htobe32(req_len);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -1278,7 +1297,7 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
        case LTTNG_VIEWER_GET_PACKET_RETRY:
                /* Unimplemented by relay daemon */
                BT_LOGD("get_data_packet: retry");
-               retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+               retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
                goto end;
        case LTTNG_VIEWER_GET_PACKET_ERR:
                if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
@@ -1291,13 +1310,13 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
                }
                if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
                                | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
-                       retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+                       retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
                        goto end;
                }
                BT_LOGE("get_data_packet: error");
                goto error;
        case LTTNG_VIEWER_GET_PACKET_EOF:
-               retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF;
+               retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_EOF;
                goto end;
        default:
                BT_LOGE("get_data_packet: unknown");
@@ -1324,9 +1343,9 @@ end:
 
 error:
        if (lttng_live_is_canceled(lttng_live)) {
-               retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+               retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
        } else {
-               retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+               retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_ERROR;
        }
        return retstatus;
 }
@@ -1335,11 +1354,11 @@ error:
  * Request new streams for a session.
  */
 BT_HIDDEN
-enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
+enum bt_lttng_live_iterator_status lttng_live_get_new_streams(
                struct lttng_live_session *session)
 {
-       enum bt_ctf_lttng_live_iterator_status status =
-                       BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum bt_lttng_live_iterator_status status =
+                       BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_new_streams_request rq;
        struct lttng_viewer_new_streams_response rp;
@@ -1348,9 +1367,11 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        uint32_t streams_count;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (!session->new_streams_needed) {
-               return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+               return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
@@ -1360,19 +1381,20 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session->id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -1397,7 +1419,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
        case LTTNG_VIEWER_NEW_STREAMS_HUP:
                session->new_streams_needed = false;
                session->closed = true;
-               status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+               status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
                goto end;
        case LTTNG_VIEWER_NEW_STREAMS_ERR:
                BT_LOGE("get_new_streams error");
@@ -1415,9 +1437,9 @@ end:
 
 error:
        if (lttng_live_is_canceled(lttng_live)) {
-               status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+               status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
        } else {
-               status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
        }
        return status;
 }
This page took 0.031734 seconds and 4 git commands to generate.