X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=f371095ce5eed5ccf87945993a58fe457cad8cc4;hb=601b0d3c9a6bf91274d0f01ccdec7fecfe3ed310;hp=81f043facbb566208a0aec62ab686f31de3679fd;hpb=06994c7113d1a2fb4019b5db624f49f21d8aa089;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c index 81f043fa..f371095c 100644 --- a/plugins/ctf/lttng-live/viewer-connection.c +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -30,17 +30,14 @@ #include #include #include -#include #include -#include -#include #include -#include -#include +#include +#include #include #include -#include +#include #include "lttng-live-internal.h" #include "viewer-connection.h" @@ -55,17 +52,17 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti size_t copied = 0, to_copy = len; struct lttng_live_component *lttng_live = viewer_connection->lttng_live; - int fd = viewer_connection->control_sock; + BT_SOCKET sock = viewer_connection->control_sock; do { - ret = recv(fd, buf + copied, to_copy, 0); + ret = bt_socket_recv(sock, buf + copied, to_copy, 0); if (ret > 0) { - assert(ret <= to_copy); + BT_ASSERT(ret <= to_copy); copied += ret; to_copy -= ret; } - if (ret < 0 && errno == EINTR) { - if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) { + if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { + if (lttng_live_is_canceled(lttng_live)) { break; } else { continue; @@ -74,7 +71,7 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti } while (ret > 0 && to_copy > 0); if (ret > 0) ret = copied; - /* ret = 0 means orderly shutdown, ret < 0 is error. */ + /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */ return ret; } @@ -83,13 +80,13 @@ static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connecti { struct lttng_live_component *lttng_live = viewer_connection->lttng_live; - int fd = viewer_connection->control_sock; + BT_SOCKET sock = viewer_connection->control_sock; ssize_t ret; for (;;) { - ret = bt_send_nosigpipe(fd, buf, len); - if (ret < 0 && errno == EINTR) { - if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) { + ret = bt_socket_send_nosigpipe(sock, buf, len); + if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { + if (lttng_live_is_canceled(lttng_live)) { break; } else { continue; @@ -160,6 +157,8 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); + char cmd_buf[cmd_buf_len]; int ret; ssize_t ret_len; @@ -172,33 +171,34 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending version: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect)); - if (ret_len < 0) { - BT_LOGE("Error sending version: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(connect)); + BT_ASSERT(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving version: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving version: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(connect)); + BT_ASSERT(ret_len == sizeof(connect)); BT_LOGD("Received viewer session ID : %" PRIu64, - be64toh(connect.viewer_session_id)); + (uint64_t) be64toh(connect.viewer_session_id)); BT_LOGD("Relayd version : %u.%u", be32toh(connect.major), be32toh(connect.minor)); @@ -238,8 +238,8 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co goto error; } - if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - BT_LOGE("Socket creation failed: %s", strerror(errno)); + if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) { + BT_LOGE("Socket creation failed: %s", bt_socket_errormsg()); goto error; } @@ -249,8 +249,8 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co memset(&(server_addr.sin_zero), 0, 8); if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr, - sizeof(struct sockaddr)) == -1) { - BT_LOGE("Connection failed: %s", strerror(errno)); + sizeof(struct sockaddr)) == BT_SOCKET_ERROR) { + BT_LOGE("Connection failed: %s", bt_socket_errormsg()); goto error; } if (lttng_live_handshake(viewer_connection)) { @@ -262,23 +262,23 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co return ret; error: - if (viewer_connection->control_sock >= 0) { - if (close(viewer_connection->control_sock)) { - BT_LOGE("Close: %s", strerror(errno)); + if (viewer_connection->control_sock != BT_INVALID_SOCKET) { + if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { + BT_LOGE("Close: %s", bt_socket_errormsg()); } } - viewer_connection->control_sock = -1; + viewer_connection->control_sock = BT_INVALID_SOCKET; return -1; } static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection) { - if (viewer_connection->control_sock < 0) { + if (viewer_connection->control_sock == BT_INVALID_SOCKET) { return; } - if (close(viewer_connection->control_sock)) { - BT_LOGE("Close: %s", strerror(errno)); - viewer_connection->control_sock = -1; + if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { + BT_LOGE("Close: %s", bt_socket_errormsg()); + viewer_connection->control_sock = BT_INVALID_SOCKET; } } @@ -303,7 +303,7 @@ enum bt_value_status list_update_session(struct bt_value *results, int i, len; bool found = false; - len = bt_value_array_size(results); + len = bt_value_array_get_size(results); if (len < 0) { ret = BT_VALUE_STATUS_ERROR; goto end; @@ -327,14 +327,8 @@ enum bt_value_status list_update_session(struct bt_value *results, ret = BT_VALUE_STATUS_ERROR; goto end; } - ret = bt_value_string_get(hostname, &hostname_str); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } - ret = bt_value_string_get(session_name, &session_name_str); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } + hostname_str = bt_value_string_get(hostname); + session_name_str = bt_value_string_get(session_name); if (!strcmp(session->hostname, hostname_str) && !strcmp(session->session_name, @@ -350,49 +344,43 @@ enum bt_value_status list_update_session(struct bt_value *results, ret = BT_VALUE_STATUS_ERROR; goto end; } - ret = bt_value_integer_get(btval, &val); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } + val = bt_value_integer_get(btval); /* sum */ val += streams; - ret = bt_value_integer_set(btval, val); + ret = bt_private_integer_bool_set(btval, val); if (ret != BT_VALUE_STATUS_OK) { goto end; } - BT_PUT(btval); + BT_OBJECT_PUT_REF_AND_RESET(btval); btval = bt_value_map_get(map, "client-count"); if (!btval) { ret = BT_VALUE_STATUS_ERROR; goto end; } - ret = bt_value_integer_get(btval, &val); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } + val = bt_value_integer_get(btval); /* max */ val = max_t(int64_t, clients, val); - ret = bt_value_integer_set(btval, val); + ret = bt_private_integer_bool_set(btval, val); if (ret != BT_VALUE_STATUS_OK) { goto end; } - BT_PUT(btval); + BT_OBJECT_PUT_REF_AND_RESET(btval); } - BT_PUT(hostname); - BT_PUT(session_name); - BT_PUT(map); + BT_OBJECT_PUT_REF_AND_RESET(hostname); + BT_OBJECT_PUT_REF_AND_RESET(session_name); + BT_OBJECT_PUT_REF_AND_RESET(map); if (found) { break; } } end: - BT_PUT(btval); - BT_PUT(hostname); - BT_PUT(session_name); - BT_PUT(map); + BT_OBJECT_PUT_REF_AND_RESET(btval); + BT_OBJECT_PUT_REF_AND_RESET(hostname); + BT_OBJECT_PUT_REF_AND_RESET(session_name); + BT_OBJECT_PUT_REF_AND_RESET(map); *_found = found; return ret; } @@ -416,7 +404,7 @@ enum bt_value_status list_append_session(struct bt_value *results, goto end; } - map = bt_value_map_create(); + map = bt_private_value_map_create(); if (!map) { ret = BT_VALUE_STATUS_ERROR; goto end; @@ -436,7 +424,7 @@ enum bt_value_status list_append_session(struct bt_value *results, g_string_append_c(url, '/'); g_string_append(url, session->session_name); - ret = bt_value_map_insert_string(map, "url", url->str); + ret = bt_private_value_map_insert_string_entry(map, "url", url->str); if (ret != BT_VALUE_STATUS_OK) { goto end; } @@ -445,7 +433,7 @@ enum bt_value_status list_append_session(struct bt_value *results, * key = "target-hostname", * value = , */ - ret = bt_value_map_insert_string(map, "target-hostname", + ret = bt_private_value_map_insert_string_entry(map, "target-hostname", session->hostname); if (ret != BT_VALUE_STATUS_OK) { goto end; @@ -455,7 +443,7 @@ enum bt_value_status list_append_session(struct bt_value *results, * key = "session-name", * value = , */ - ret = bt_value_map_insert_string(map, "session-name", + ret = bt_private_value_map_insert_string_entry(map, "session-name", session->session_name); if (ret != BT_VALUE_STATUS_OK) { goto end; @@ -468,7 +456,7 @@ enum bt_value_status list_append_session(struct bt_value *results, { uint32_t live_timer = be32toh(session->live_timer); - ret = bt_value_map_insert_integer(map, "timer-us", + ret = bt_private_value_map_insert_integer_entry(map, "timer-us", live_timer); if (ret != BT_VALUE_STATUS_OK) { goto end; @@ -482,7 +470,7 @@ enum bt_value_status list_append_session(struct bt_value *results, { uint32_t streams = be32toh(session->streams); - ret = bt_value_map_insert_integer(map, "stream-count", + ret = bt_private_value_map_insert_integer_entry(map, "stream-count", streams); if (ret != BT_VALUE_STATUS_OK) { goto end; @@ -497,19 +485,19 @@ enum bt_value_status list_append_session(struct bt_value *results, { uint32_t clients = be32toh(session->clients); - ret = bt_value_map_insert_integer(map, "client-count", + ret = bt_private_value_map_insert_integer_entry(map, "client-count", clients); if (ret != BT_VALUE_STATUS_OK) { goto end; } } - ret = bt_value_array_append(results, map); + ret = bt_private_value_array_append_element(results, map); end: if (url) { g_string_free(url, TRUE); } - BT_PUT(map); + BT_OBJECT_PUT_REF_AND_RESET(map); return ret; } @@ -562,7 +550,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c goto error; } - results = bt_value_array_create(); + results = bt_private_value_array_create(); if (!results) { BT_LOGE("Error creating array"); goto error; @@ -573,22 +561,22 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session list: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(list)); + BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { @@ -600,11 +588,11 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(lsession)); + BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; if (list_append_session(results, @@ -615,7 +603,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c } goto end; error: - BT_PUT(results); + BT_OBJECT_PUT_REF_AND_RESET(results); end: return results; } @@ -637,22 +625,22 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session list: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(list)); + BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { @@ -662,11 +650,11 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(lsession)); + BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; session_id = be64toh(lsession.id); @@ -676,9 +664,9 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) if ((strncmp(lsession.session_name, viewer_connection->session_name->str, - MAXNAMLEN) == 0) && (strncmp(lsession.hostname, + LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname, viewer_connection->target_hostname->str, - MAXNAMLEN) == 0)) { + LTTNG_VIEWER_HOST_NAME_MAX) == 0)) { if (lttng_live_add_session(lttng_live, session_id, lsession.hostname, lsession.session_name)) { @@ -708,22 +696,22 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving create session reply: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(resp)); + BT_ASSERT(ret_len == sizeof(resp)); if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { BT_LOGE("Error creating viewer session"); @@ -761,11 +749,11 @@ int receive_streams(struct lttng_live_session *session, BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { + if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving stream"); goto error; } - assert(ret_len == sizeof(stream)); + BT_ASSERT(ret_len == sizeof(stream)); stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; stream_id = be64toh(stream.id); @@ -813,6 +801,8 @@ int lttng_live_attach_session(struct lttng_live_session *session) lttng_live->viewer_connection; uint64_t session_id = session->id; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (session->attached) { return 0; @@ -828,30 +818,30 @@ int lttng_live_attach_session(struct lttng_live_session *session) // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending attach request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending attach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + BT_ASSERT(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving attach response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); switch(be32toh(rp.status)) { @@ -899,6 +889,8 @@ int lttng_live_detach_session(struct lttng_live_session *session) struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; uint64_t session_id = session->id; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->attached) { return 0; @@ -911,30 +903,30 @@ int lttng_live_detach_session(struct lttng_live_session *session) memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session_id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending detach request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending detach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + BT_ASSERT(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving detach response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); switch(be32toh(rp.status)) { case LTTNG_VIEWER_DETACH_SESSION_OK: @@ -974,36 +966,38 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, struct lttng_live_metadata *metadata = trace->metadata; struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; rq.stream_id = htobe64(metadata->stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_metadata request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + BT_ASSERT(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_metadata response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); switch (be32toh(rp.status)) { case LTTNG_VIEWER_METADATA_OK: @@ -1037,11 +1031,11 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGI("Remote side has closed connection"); goto error_free_data; } - if (ret_len < 0) { - BT_LOGE("Error receiving trace packet: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); goto error_free_data; } - assert(ret_len == len); + BT_ASSERT(ret_len == len); do { ret_len = fwrite(data, 1, len, fp); @@ -1050,7 +1044,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGE("Writing in the metadata fp"); goto error_free_data; } - assert(ret_len == len); + BT_ASSERT(ret_len == len); free(data); ret = len; end: @@ -1069,8 +1063,8 @@ static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, struct packet_index *pindex) { - assert(lindex); - assert(pindex); + BT_ASSERT(lindex); + BT_ASSERT(pindex); pindex->offset = be64toh(lindex->offset); pindex->packet_size = be64toh(lindex->packet_size); @@ -1081,7 +1075,7 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, } BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, +enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *stream, struct packet_index *index) { @@ -1090,43 +1084,46 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li ssize_t ret_len; struct lttng_viewer_index rp; uint32_t flags, status; - enum bt_ctf_lttng_live_iterator_status retstatus = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status retstatus = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); + memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_next_index request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + BT_ASSERT(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_next_index response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); flags = be32toh(rp.flags); status = be32toh(rp.status); @@ -1142,7 +1139,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end; ctf_stream_class_id = be64toh(rp.stream_id); if (stream->ctf_stream_class_id != -1ULL) { - assert(stream->ctf_stream_class_id == + BT_ASSERT(stream->ctf_stream_class_id == ctf_stream_class_id); } else { stream->ctf_stream_class_id = ctf_stream_class_id; @@ -1158,7 +1155,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li lttng_index_to_packet_index(&rp, index); ctf_stream_class_id = be64toh(rp.stream_id); if (stream->ctf_stream_class_id != -1ULL) { - assert(stream->ctf_stream_class_id == + BT_ASSERT(stream->ctf_stream_class_id == ctf_stream_class_id); } else { stream->ctf_stream_class_id = ctf_stream_class_id; @@ -1181,14 +1178,14 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li case LTTNG_VIEWER_INDEX_RETRY: BT_LOGD("get_next_index: retry"); memset(index, 0, sizeof(struct packet_index)); - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; goto end; case LTTNG_VIEWER_INDEX_HUP: BT_LOGD("get_next_index: stream hung up"); memset(index, 0, sizeof(struct packet_index)); index->offset = EOF; - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END; stream->state = LTTNG_LIVE_STREAM_EOF; break; case LTTNG_VIEWER_INDEX_ERR: @@ -1206,20 +1203,20 @@ end: return retstatus; error: - if (bt_graph_is_canceled(lttng_live->graph)) { - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + if (lttng_live_is_canceled(lttng_live)) { + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; } return retstatus; } BT_HIDDEN -enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, +enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, uint64_t req_len, uint64_t *recv_len) { - enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK; + enum bt_notif_iter_medium_status retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_packet rq; struct lttng_viewer_trace_packet rp; @@ -1228,6 +1225,8 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, offset, req_len); @@ -1240,27 +1239,27 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li rq.offset = htobe64(offset); rq.len = htobe32(req_len); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_data request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + BT_ASSERT(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_data response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_data response: %s", bt_socket_errormsg()); goto error; } if (ret_len != sizeof(rp)) { @@ -1281,7 +1280,7 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li case LTTNG_VIEWER_GET_PACKET_RETRY: /* Unimplemented by relay daemon */ BT_LOGD("get_data_packet: retry"); - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; goto end; case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { @@ -1294,13 +1293,13 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li } if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; goto end; } BT_LOGE("get_data_packet: error"); goto error; case LTTNG_VIEWER_GET_PACKET_EOF: - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_EOF; goto end; default: BT_LOGE("get_data_packet: unknown"); @@ -1316,20 +1315,20 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving trace packet: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == req_len); + BT_ASSERT(ret_len == req_len); *recv_len = ret_len; end: return retstatus; error: - if (bt_graph_is_canceled(lttng_live->graph)) { - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + if (lttng_live_is_canceled(lttng_live)) { + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; } else { - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_ERROR; } return retstatus; } @@ -1338,11 +1337,11 @@ error: * Request new streams for a session. */ BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( +enum bt_lttng_live_iterator_status lttng_live_get_new_streams( struct lttng_live_session *session) { - enum bt_ctf_lttng_live_iterator_status status = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status status = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_new_streams_request rq; struct lttng_viewer_new_streams_response rp; @@ -1351,9 +1350,11 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->new_streams_needed) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; } cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); @@ -1363,30 +1364,30 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session->id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_new_streams request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + BT_ASSERT(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { + if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving get_new_streams response"); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); @@ -1400,7 +1401,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( case LTTNG_VIEWER_NEW_STREAMS_HUP: session->new_streams_needed = false; session->closed = true; - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + status = BT_LTTNG_LIVE_ITERATOR_STATUS_END; goto end; case LTTNG_VIEWER_NEW_STREAMS_ERR: BT_LOGE("get_new_streams error"); @@ -1417,10 +1418,10 @@ end: return status; error: - if (bt_graph_is_canceled(lttng_live->graph)) { - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + if (lttng_live_is_canceled(lttng_live)) { + status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; } return status; } @@ -1434,8 +1435,12 @@ struct bt_live_viewer_connection * viewer_connection = g_new0(struct bt_live_viewer_connection, 1); + if (bt_socket_init() != 0) { + goto error; + } + bt_object_init(&viewer_connection->obj, connection_release); - viewer_connection->control_sock = -1; + viewer_connection->control_sock = BT_INVALID_SOCKET; viewer_connection->port = -1; viewer_connection->lttng_live = lttng_live; viewer_connection->url = g_string_new(url); @@ -1473,4 +1478,6 @@ void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_ g_string_free(viewer_connection->session_name, TRUE); } g_free(viewer_connection); + + bt_socket_fini(); }