X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=b37db12ec46e2f9a0f464658b71e7e63a0440692;hb=bb18709be19ebc5b1bd9264cdbd3dd20939bdd05;hp=e5d1fc13e6d5a7c366b734c55d56f35a60a19604;hpb=087bc0603db40d8cd5f4fddf933f3ded4829369e;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c index e5d1fc13..b37db12e 100644 --- a/plugins/ctf/lttng-live/viewer-connection.c +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -1,4 +1,5 @@ /* + * Copyright 2019 - Francis Deslauriers * Copyright 2016 - Mathieu Desnoyers * * Permission is hereby granted, free of charge, to any person obtaining a copy @@ -20,6 +21,9 @@ * SOFTWARE. */ +#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC-VIEWER" +#include "logging.h" + #include #include #include @@ -27,169 +31,141 @@ #include #include #include -#include #include -#include -#include #include -#include -#include +#include +#include #include +#include +#include -#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER" - -#include "lttng-live-internal.h" +#include "lttng-live.h" #include "viewer-connection.h" #include "lttng-viewer-abi.h" #include "data-stream.h" #include "metadata.h" -static ssize_t lttng_live_recv(int fd, void *buf, size_t len) +static +ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection, + void *buf, size_t len) { ssize_t ret; size_t copied = 0, to_copy = len; + struct lttng_live_msg_iter *lttng_live_msg_iter = + viewer_connection->lttng_live_msg_iter; + BT_SOCKET sock = viewer_connection->control_sock; do { - ret = recv(fd, buf + copied, to_copy, 0); + ret = bt_socket_recv(sock, buf + copied, to_copy, 0); if (ret > 0) { - assert(ret <= to_copy); + BT_ASSERT(ret <= to_copy); copied += ret; to_copy -= ret; } - } while ((ret > 0 && to_copy > 0) - || (ret < 0 && errno == EINTR)); + if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { + if (!viewer_connection->in_query && + lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) { + break; + } else { + continue; + } + } + } while (ret > 0 && to_copy > 0); if (ret > 0) ret = copied; - /* ret = 0 means orderly shutdown, ret < 0 is error. */ + /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */ return ret; } -static ssize_t lttng_live_send(int fd, const void *buf, size_t len) +static +ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection, + const void *buf, size_t len) { + struct lttng_live_msg_iter *lttng_live_msg_iter = + viewer_connection->lttng_live_msg_iter; + BT_SOCKET sock = viewer_connection->control_sock; ssize_t ret; - do { - ret = bt_send_nosigpipe(fd, buf, len); - } while (ret < 0 && errno == EINTR); + for (;;) { + ret = bt_socket_send_nosigpipe(sock, buf, len); + if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { + if (!viewer_connection->in_query && + lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) { + break; + } else { + continue; + } + } else { + break; + } + } return ret; } -/* - * hostname parameter needs to hold MAXNAMLEN chars. - */ -static int parse_url(struct bt_live_viewer_connection *viewer_connection) +static +int parse_url(struct live_viewer_connection *viewer_connection) { - char remain[3][MAXNAMLEN]; - int ret = -1, proto, proto_offset = 0; + char error_buf[256] = { 0 }; + struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 }; + int ret = -1; const char *path = viewer_connection->url->str; - size_t path_len; if (!path) { goto end; } - path_len = strlen(path); /* not accounting \0 */ - /* - * Since sscanf API does not allow easily checking string length - * against a size defined by a macro. Test it beforehand on the - * input. We know the output is always <= than the input length. - */ - if (path_len >= MAXNAMLEN) { + lttng_live_url_parts = bt_common_parse_lttng_live_url(path, + error_buf, sizeof(error_buf)); + if (!lttng_live_url_parts.proto) { + BT_LOGW("Invalid LTTng live URL format: %s", error_buf); goto end; } - ret = sscanf(path, "net%d://", &proto); - if (ret < 1) { - proto = 4; - /* net:// */ - proto_offset = strlen("net://"); - } else { - /* net4:// or net6:// */ - proto_offset = strlen("netX://"); - } - if (proto_offset > path_len) { - goto end; - } - if (proto == 6) { - BT_LOGW("IPv6 is currently unsupported by lttng-live"); - goto end; - } - /* TODO : parse for IPv6 as well */ - /* Parse the hostname or IP */ - ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s", - viewer_connection->relay_hostname, remain[0]); - if (ret == 2) { - /* Optional port number */ - switch (remain[0][0]) { - case ':': - ret = sscanf(remain[0], ":%d%s", &viewer_connection->port, remain[1]); - /* Optional session ID with port number */ - if (ret == 2) { - ret = sscanf(remain[1], "/%s", remain[2]); - /* Accept 0 or 1 (optional) */ - if (ret < 0) { - goto end; - } - } else if (ret == 0) { - BT_LOGW("Missing port number after delimitor ':'"); - ret = -1; - goto end; - } - break; - case '/': - /* Optional session ID */ - ret = sscanf(remain[0], "/%s", remain[2]); - /* Accept 0 or 1 (optional) */ - if (ret < 0) { - goto end; - } - break; - default: - BT_LOGW("wrong delimitor : %c", remain[0][0]); - ret = -1; - goto end; - } - } - if (viewer_connection->port < 0) { + viewer_connection->relay_hostname = + lttng_live_url_parts.hostname; + lttng_live_url_parts.hostname = NULL; + + if (lttng_live_url_parts.port >= 0) { + viewer_connection->port = lttng_live_url_parts.port; + } else { viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT; } - if (strlen(remain[2]) == 0) { - BT_LOGD("Connecting to hostname : %s, port : %d, " - "proto : IPv%d", - viewer_connection->relay_hostname, - viewer_connection->port, - proto); - ret = 0; - goto end; - } - ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s", - viewer_connection->target_hostname, - viewer_connection->session_name); - if (ret != 2) { - BT_LOGW("Format : " - "net:///host//"); - goto end; + viewer_connection->target_hostname = + lttng_live_url_parts.target_hostname; + lttng_live_url_parts.target_hostname = NULL; + + if (lttng_live_url_parts.session_name) { + viewer_connection->session_name = + lttng_live_url_parts.session_name; + lttng_live_url_parts.session_name = NULL; } BT_LOGD("Connecting to hostname : %s, port : %d, " "target hostname : %s, session name : %s, " - "proto : IPv%d", - viewer_connection->relay_hostname, + "proto : %s", + viewer_connection->relay_hostname->str, viewer_connection->port, - viewer_connection->target_hostname, - viewer_connection->session_name, proto); + viewer_connection->target_hostname == NULL ? + "" : viewer_connection->target_hostname->str, + viewer_connection->session_name == NULL ? + "" : viewer_connection->session_name->str, + lttng_live_url_parts.proto->str); ret = 0; end: + bt_common_destroy_lttng_live_url_parts(<tng_live_url_parts); return ret; } -static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection) +static +int lttng_live_handshake(struct live_viewer_connection *viewer_connection) { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); + char cmd_buf[cmd_buf_len]; int ret; ssize_t ret_len; @@ -202,33 +178,34 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending version: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect)); - if (ret_len < 0) { - BT_LOGE("Error sending version: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(connect)); + BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect)); + ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving version: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving version: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(connect)); + BT_ASSERT(ret_len == sizeof(connect)); BT_LOGD("Received viewer session ID : %" PRIu64, - be64toh(connect.viewer_session_id)); + (uint64_t) be64toh(connect.viewer_session_id)); BT_LOGD("Relayd version : %u.%u", be32toh(connect.major), be32toh(connect.minor)); @@ -251,7 +228,8 @@ error: return -1; } -static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection) +static +int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection) { struct hostent *host; struct sockaddr_in server_addr; @@ -261,15 +239,15 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co goto error; } - host = gethostbyname(viewer_connection->relay_hostname); + host = gethostbyname(viewer_connection->relay_hostname->str); if (!host) { BT_LOGE("Cannot lookup hostname %s", - viewer_connection->relay_hostname); + viewer_connection->relay_hostname->str); goto error; } - if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - BT_LOGE("Socket creation failed: %s", strerror(errno)); + if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) { + BT_LOGE("Socket creation failed: %s", bt_socket_errormsg()); goto error; } @@ -279,8 +257,8 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co memset(&(server_addr.sin_zero), 0, 8); if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr, - sizeof(struct sockaddr)) == -1) { - BT_LOGE("Connection failed: %s", strerror(errno)); + sizeof(struct sockaddr)) == BT_SOCKET_ERROR) { + BT_LOGE("Connection failed: %s", bt_socket_errormsg()); goto error; } if (lttng_live_handshake(viewer_connection)) { @@ -292,79 +270,80 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co return ret; error: - if (viewer_connection->control_sock >= 0) { - if (close(viewer_connection->control_sock)) { - BT_LOGE("Close: %s", strerror(errno)); + if (viewer_connection->control_sock != BT_INVALID_SOCKET) { + if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { + BT_LOGE("Close: %s", bt_socket_errormsg()); } } - viewer_connection->control_sock = -1; + viewer_connection->control_sock = BT_INVALID_SOCKET; return -1; } -static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection) +static +void lttng_live_disconnect_viewer( + struct live_viewer_connection *viewer_connection) { - if (viewer_connection->control_sock < 0) { + if (viewer_connection->control_sock == BT_INVALID_SOCKET) { return; } - if (close(viewer_connection->control_sock)) { - BT_LOGE("Close: %s", strerror(errno)); - viewer_connection->control_sock = -1; + if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { + BT_LOGE("Close: %s", bt_socket_errormsg()); + viewer_connection->control_sock = BT_INVALID_SOCKET; } } -static void connection_release(struct bt_object *obj) +static +void connection_release(bt_object *obj) { - struct bt_live_viewer_connection *conn = - container_of(obj, struct bt_live_viewer_connection, obj); + struct live_viewer_connection *conn = + container_of(obj, struct live_viewer_connection, obj); - bt_live_viewer_connection_destroy(conn); + live_viewer_connection_destroy(conn); } static -enum bt_value_status list_update_session(struct bt_value *results, +int list_update_session(bt_value *results, const struct lttng_viewer_session *session, bool *_found) { - enum bt_value_status ret = BT_VALUE_STATUS_OK; - struct bt_value *map = NULL; - struct bt_value *hostname = NULL; - struct bt_value *session_name = NULL; - struct bt_value *btval = NULL; + int ret = 0; + bt_value *map = NULL; + bt_value *hostname = NULL; + bt_value *session_name = NULL; + bt_value *btval = NULL; int i, len; bool found = false; - len = bt_value_array_size(results); + len = bt_value_array_get_size(results); if (len < 0) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error getting size of array."); + ret = -1; goto end; } for (i = 0; i < len; i++) { const char *hostname_str = NULL; const char *session_name_str = NULL; - map = bt_value_array_get(results, (size_t) i); + map = bt_value_array_borrow_element_by_index(results, (size_t) i); if (!map) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error borrowing map."); + ret = -1; goto end; } - hostname = bt_value_map_get(map, "target-hostname"); + hostname = bt_value_map_borrow_entry_value(map, "target-hostname"); if (!hostname) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error borrowing \"target-hostname\" entry."); + ret = -1; goto end; } - session_name = bt_value_map_get(map, "session-name"); + session_name = bt_value_map_borrow_entry_value(map, "session-name"); if (!session_name) { - ret = BT_VALUE_STATUS_ERROR; - goto end; - } - ret = bt_value_string_get(hostname, &hostname_str); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } - ret = bt_value_string_get(session_name, &session_name_str); - if (ret != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error borrowing \"session-name\" entry."); + ret = -1; goto end; } + hostname_str = bt_value_string_get(hostname); + session_name_str = bt_value_string_get(session_name); if (!strcmp(session->hostname, hostname_str) && !strcmp(session->session_name, @@ -375,65 +354,46 @@ enum bt_value_status list_update_session(struct bt_value *results, found = true; - btval = bt_value_map_get(map, "stream-count"); + btval = bt_value_map_borrow_entry_value(map, "stream-count"); if (!btval) { - ret = BT_VALUE_STATUS_ERROR; - goto end; - } - ret = bt_value_integer_get(btval, &val); - if (ret != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error borrowing \"stream-count\" entry."); + ret = -1; goto end; } + val = bt_value_integer_get(btval); /* sum */ val += streams; - ret = bt_value_integer_set(btval, val); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } - BT_PUT(btval); + bt_value_integer_set(btval, val); - btval = bt_value_map_get(map, "client-count"); + btval = bt_value_map_borrow_entry_value(map, "client-count"); if (!btval) { - ret = BT_VALUE_STATUS_ERROR; - goto end; - } - ret = bt_value_integer_get(btval, &val); - if (ret != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error borrowing \"client-count\" entry."); + ret = -1; goto end; } + val = bt_value_integer_get(btval); /* max */ val = max_t(int64_t, clients, val); - ret = bt_value_integer_set(btval, val); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } - BT_PUT(btval); + bt_value_integer_set(btval, val); } - BT_PUT(hostname); - BT_PUT(session_name); - BT_PUT(map); - if (found) { break; } } end: - BT_PUT(btval); - BT_PUT(hostname); - BT_PUT(session_name); - BT_PUT(map); *_found = found; return ret; } static -enum bt_value_status list_append_session(struct bt_value *results, +int list_append_session(bt_value *results, GString *base_url, const struct lttng_viewer_session *session) { - enum bt_value_status ret = BT_VALUE_STATUS_OK; - struct bt_value *map = NULL; + int ret = 0; + bt_value_status ret_status; + bt_value *map = NULL; GString *url = NULL; bool found = false; @@ -442,18 +402,20 @@ enum bt_value_status list_append_session(struct bt_value *results, * and do max of client counts. */ ret = list_update_session(results, session, &found); - if (ret != BT_VALUE_STATUS_OK || found) { + if (ret || found) { goto end; } map = bt_value_map_create(); if (!map) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error creating map value."); + ret = -1; goto end; } if (base_url->len < 1) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error: base_url length smaller than 1."); + ret = -1; goto end; } /* @@ -466,8 +428,10 @@ enum bt_value_status list_append_session(struct bt_value *results, g_string_append_c(url, '/'); g_string_append(url, session->session_name); - ret = bt_value_map_insert_string(map, "url", url->str); - if (ret != BT_VALUE_STATUS_OK) { + ret_status = bt_value_map_insert_string_entry(map, "url", url->str); + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"url\" entry."); + ret = -1; goto end; } @@ -475,9 +439,11 @@ enum bt_value_status list_append_session(struct bt_value *results, * key = "target-hostname", * value = , */ - ret = bt_value_map_insert_string(map, "target-hostname", + ret_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"target-hostname\" entry."); + ret = -1; goto end; } @@ -485,9 +451,11 @@ enum bt_value_status list_append_session(struct bt_value *results, * key = "session-name", * value = , */ - ret = bt_value_map_insert_string(map, "session-name", + ret_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"session-name\" entry."); + ret = -1; goto end; } @@ -498,9 +466,11 @@ enum bt_value_status list_append_session(struct bt_value *results, { uint32_t live_timer = be32toh(session->live_timer); - ret = bt_value_map_insert_integer(map, "timer-us", + ret_status = bt_value_map_insert_integer_entry(map, "timer-us", live_timer); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"timer-us\" entry."); + ret = -1; goto end; } } @@ -512,14 +482,15 @@ enum bt_value_status list_append_session(struct bt_value *results, { uint32_t streams = be32toh(session->streams); - ret = bt_value_map_insert_integer(map, "stream-count", + ret_status = bt_value_map_insert_integer_entry(map, "stream-count", streams); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"stream-count\" entry."); + ret = -1; goto end; } } - /* * key = "client-count", * value = , @@ -527,19 +498,26 @@ enum bt_value_status list_append_session(struct bt_value *results, { uint32_t clients = be32toh(session->clients); - ret = bt_value_map_insert_integer(map, "client-count", + ret_status = bt_value_map_insert_integer_entry(map, "client-count", clients); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"client-count\" entry."); + ret = -1; goto end; } } - ret = bt_value_array_append(results, map); + ret_status = bt_value_array_append_element(results, map); + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error appending map to results."); + ret = -1; + } + end: if (url) { - g_string_free(url, TRUE); + g_string_free(url, true); } - BT_PUT(map); + BT_VALUE_PUT_REF_AND_RESET(map); return ret; } @@ -580,9 +558,12 @@ end: */ BT_HIDDEN -struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection) +bt_query_status live_viewer_connection_list_sessions( + struct live_viewer_connection *viewer_connection, + const bt_value **user_result) { - struct bt_value *results = NULL; + bt_query_status status = BT_QUERY_STATUS_OK; + bt_value *result = NULL; struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; uint32_t i, sessions_count; @@ -592,9 +573,10 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c goto error; } - results = bt_value_array_create(); - if (!results) { + result = bt_value_array_create(); + if (!result) { BT_LOGE("Error creating array"); + status = BT_QUERY_STATUS_NOMEM; goto error; } @@ -602,56 +584,63 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); + status = BT_QUERY_STATUS_ERROR; goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); + status = BT_QUERY_STATUS_ERROR; goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session list: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); + status = BT_QUERY_STATUS_ERROR; goto error; } - assert(ret_len == sizeof(list)); + BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { struct lttng_viewer_session lsession; - ret_len = lttng_live_recv(viewer_connection->control_sock, - &lsession, sizeof(lsession)); + ret_len = lttng_live_recv(viewer_connection, &lsession, + sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); + status = BT_QUERY_STATUS_ERROR; goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); + status = BT_QUERY_STATUS_ERROR; goto error; } - assert(ret_len == sizeof(lsession)); + BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; - if (list_append_session(results, - viewer_connection->url, &lsession) - != BT_VALUE_STATUS_OK) { + if (list_append_session(result, viewer_connection->url, + &lsession)) { + status = BT_QUERY_STATUS_ERROR; goto error; } } + + *user_result = result; goto end; error: - BT_PUT(results); + BT_VALUE_PUT_REF_AND_RESET(result); end: - return results; + return status; } static -int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) +int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter) { struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; @@ -659,54 +648,59 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) uint32_t i, sessions_count; ssize_t ret_len; uint64_t session_id; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session list: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(list)); + BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { - ret_len = lttng_live_recv(viewer_connection->control_sock, + ret_len = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(lsession)); + BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; session_id = be64toh(lsession.id); + BT_LOGD("Adding session %" PRIu64 " hostname: %s session_name: %s", + session_id, lsession.hostname, lsession.session_name); + if ((strncmp(lsession.session_name, - viewer_connection->session_name, - MAXNAMLEN) == 0) && (strncmp(lsession.hostname, - viewer_connection->target_hostname, - MAXNAMLEN) == 0)) { - if (lttng_live_add_session(lttng_live, session_id)) { + viewer_connection->session_name->str, + LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname, + viewer_connection->target_hostname->str, + LTTNG_VIEWER_HOST_NAME_MAX) == 0)) { + if (lttng_live_add_session(lttng_live_msg_iter, session_id, + lsession.hostname, + lsession.session_name)) { goto error; } } @@ -720,41 +714,42 @@ error: } BT_HIDDEN -int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) +int lttng_live_create_viewer_session( + struct lttng_live_msg_iter *lttng_live_msg_iter) { struct lttng_viewer_cmd cmd; struct lttng_viewer_create_session_response resp; ssize_t ret_len; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp)); + ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving create session reply: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(resp)); + BT_ASSERT(ret_len == sizeof(resp)); if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { BT_LOGE("Error creating viewer session"); goto error; } - if (lttng_live_query_session_ids(lttng_live)) { + if (lttng_live_query_session_ids(lttng_live_msg_iter)) { goto error; } @@ -770,9 +765,10 @@ int receive_streams(struct lttng_live_session *session, { ssize_t ret_len; uint32_t i; - struct lttng_live_component *lttng_live = session->lttng_live; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct lttng_live_msg_iter *lttng_live_msg_iter = + session->lttng_live_msg_iter; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; BT_LOGD("Getting %" PRIu32 " new streams:", stream_count); for (i = 0; i < stream_count; i++) { @@ -781,16 +777,16 @@ int receive_streams(struct lttng_live_session *session, uint64_t stream_id; uint64_t ctf_trace_id; - ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream)); + ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { + if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving stream"); goto error; } - assert(ret_len == sizeof(stream)); + BT_ASSERT(ret_len == sizeof(stream)); stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; stream_id = be64toh(stream.id); @@ -801,12 +797,13 @@ int receive_streams(struct lttng_live_session *session, stream_id, stream.path_name, stream.channel_name); if (lttng_live_metadata_create_stream(session, - ctf_trace_id, stream_id)) { + ctf_trace_id, stream_id, + stream.path_name)) { BT_LOGE("Error creating metadata stream"); goto error; } - session->lazy_stream_notif_init = true; + session->lazy_stream_msg_init = true; } else { BT_LOGD(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name, @@ -832,15 +829,13 @@ int lttng_live_attach_session(struct lttng_live_session *session) struct lttng_viewer_attach_session_request rq; struct lttng_viewer_attach_session_response rp; ssize_t ret_len; - struct lttng_live_component *lttng_live = session->lttng_live; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; uint64_t session_id = session->id; uint32_t streams_count; - - if (session->attached) { - return 0; - } + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -852,30 +847,30 @@ int lttng_live_attach_session(struct lttng_live_session *session) // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending attach request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending attach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving attach response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); switch(be32toh(rp.status)) { @@ -919,10 +914,12 @@ int lttng_live_detach_session(struct lttng_live_session *session) struct lttng_viewer_detach_session_request rq; struct lttng_viewer_detach_session_response rp; ssize_t ret_len; - struct lttng_live_component *lttng_live = session->lttng_live; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; uint64_t session_id = session->id; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->attached) { return 0; @@ -935,30 +932,30 @@ int lttng_live_detach_session(struct lttng_live_session *session) memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session_id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending detach request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending detach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving detach response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); switch(be32toh(rp.status)) { case LTTNG_VIEWER_DETACH_SESSION_OK: @@ -994,40 +991,42 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, char *data = NULL; ssize_t ret_len; struct lttng_live_session *session = trace->session; - struct lttng_live_component *lttng_live = session->lttng_live; + struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; struct lttng_live_metadata *metadata = trace->metadata; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; rq.stream_id = htobe64(metadata->stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_metadata request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_metadata response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); switch (be32toh(rp.status)) { case LTTNG_VIEWER_METADATA_OK: @@ -1056,16 +1055,16 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGE("relay data zmalloc: %s", strerror(errno)); goto error; } - ret_len = lttng_live_recv(viewer_connection->control_sock, data, len); + ret_len = lttng_live_recv(viewer_connection, data, len); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error_free_data; } - if (ret_len < 0) { - BT_LOGE("Error receiving trace packet: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); goto error_free_data; } - assert(ret_len == len); + BT_ASSERT(ret_len == len); do { ret_len = fwrite(data, 1, len, fp); @@ -1074,7 +1073,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGE("Writing in the metadata fp"); goto error_free_data; } - assert(ret_len == len); + BT_ASSERT(ret_len == len); free(data); ret = len; end: @@ -1093,8 +1092,8 @@ static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, struct packet_index *pindex) { - assert(lindex); - assert(pindex); + BT_ASSERT(lindex); + BT_ASSERT(pindex); pindex->offset = be64toh(lindex->offset); pindex->packet_size = be64toh(lindex->packet_size); @@ -1105,7 +1104,8 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, } BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, +enum lttng_live_iterator_status lttng_live_get_next_index( + struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream, struct packet_index *index) { @@ -1114,43 +1114,50 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li ssize_t ret_len; struct lttng_viewer_index rp; uint32_t flags, status; - enum bt_ctf_lttng_live_iterator_status retstatus = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + enum lttng_live_iterator_status retstatus = + LTTNG_LIVE_ITERATOR_STATUS_OK; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; + struct lttng_live_component *lttng_live = + lttng_live_msg_iter->lttng_live_comp; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); + memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_next_index request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_next_index request: %s", + bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_next_index response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_next_index response: %s", + bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); flags = be32toh(rp.flags); status = be32toh(rp.status); @@ -1163,10 +1170,10 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li BT_LOGD("get_next_index: inactive"); memset(index, 0, sizeof(struct packet_index)); index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); - stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end; + stream->current_inactivity_ts = index->ts_cycles.timestamp_end; ctf_stream_class_id = be64toh(rp.stream_id); if (stream->ctf_stream_class_id != -1ULL) { - assert(stream->ctf_stream_class_id == + BT_ASSERT(stream->ctf_stream_class_id == ctf_stream_class_id); } else { stream->ctf_stream_class_id = ctf_stream_class_id; @@ -1182,15 +1189,13 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li lttng_index_to_packet_index(&rp, index); ctf_stream_class_id = be64toh(rp.stream_id); if (stream->ctf_stream_class_id != -1ULL) { - assert(stream->ctf_stream_class_id == + BT_ASSERT(stream->ctf_stream_class_id == ctf_stream_class_id); } else { stream->ctf_stream_class_id = ctf_stream_class_id; } stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA; - stream->current_packet_end_timestamp = - index->ts_cycles.timestamp_end; if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { BT_LOGD("get_next_index: new metadata needed"); @@ -1198,21 +1203,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { BT_LOGD("get_next_index: new streams needed"); - lttng_live_need_new_streams(lttng_live); + lttng_live_need_new_streams(lttng_live_msg_iter); } break; } case LTTNG_VIEWER_INDEX_RETRY: BT_LOGD("get_next_index: retry"); memset(index, 0, sizeof(struct packet_index)); - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; goto end; case LTTNG_VIEWER_INDEX_HUP: BT_LOGD("get_next_index: stream hung up"); memset(index, 0, sizeof(struct packet_index)); index->offset = EOF; - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + retstatus = LTTNG_LIVE_ITERATOR_STATUS_END; stream->state = LTTNG_LIVE_STREAM_EOF; break; case LTTNG_VIEWER_INDEX_ERR: @@ -1230,24 +1235,33 @@ end: return retstatus; error: - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (lttng_live_is_canceled(lttng_live)) { + retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } return retstatus; } BT_HIDDEN -enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, - struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, - uint64_t req_len, uint64_t *recv_len) +enum bt_msg_iter_medium_status lttng_live_get_stream_bytes( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream, uint8_t *buf, + uint64_t offset, uint64_t req_len, uint64_t *recv_len) { - enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK; + enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_packet rq; struct lttng_viewer_trace_packet rp; ssize_t ret_len; uint32_t flags, status; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; + struct lttng_live_component *lttng_live = + lttng_live_msg_iter->lttng_live_comp; BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, offset, req_len); @@ -1260,27 +1274,27 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li rq.offset = htobe64(offset); rq.len = htobe32(req_len); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_data request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_data response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_data response: %s", bt_socket_errormsg()); goto error; } if (ret_len != sizeof(rp)) { @@ -1301,7 +1315,7 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li case LTTNG_VIEWER_GET_PACKET_RETRY: /* Unimplemented by relay daemon */ BT_LOGD("get_data_packet: retry"); - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; goto end; case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { @@ -1310,17 +1324,17 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { BT_LOGD("get_data_packet: new streams needed, try again later"); - lttng_live_need_new_streams(lttng_live); + lttng_live_need_new_streams(lttng_live_msg_iter); } if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; goto end; } BT_LOGE("get_data_packet: error"); goto error; case LTTNG_VIEWER_GET_PACKET_EOF: - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF; + retstatus = BT_MSG_ITER_MEDIUM_STATUS_EOF; goto end; default: BT_LOGE("get_data_packet: unknown"); @@ -1331,22 +1345,26 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li goto error; } - ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len); + ret_len = lttng_live_recv(viewer_connection, buf, req_len); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving trace packet: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == req_len); + BT_ASSERT(ret_len == req_len); *recv_len = ret_len; end: return retstatus; error: - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + if (lttng_live_is_canceled(lttng_live)) { + retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; + } else { + retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR; + } return retstatus; } @@ -1354,22 +1372,27 @@ error: * Request new streams for a session. */ BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( +enum lttng_live_iterator_status lttng_live_get_new_streams( struct lttng_live_session *session) { - enum bt_ctf_lttng_live_iterator_status status = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status status = + LTTNG_LIVE_ITERATOR_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_new_streams_request rq; struct lttng_viewer_new_streams_response rp; ssize_t ret_len; - struct lttng_live_component *lttng_live = session->lttng_live; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct lttng_live_msg_iter *lttng_live_msg_iter = + session->lttng_live_msg_iter; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; + struct lttng_live_component *lttng_live = + lttng_live_msg_iter->lttng_live_comp; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->new_streams_needed) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); @@ -1379,30 +1402,31 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session->id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_new_streams request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_new_streams request: %s", + bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { + if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving get_new_streams response"); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); @@ -1416,7 +1440,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( case LTTNG_VIEWER_NEW_STREAMS_HUP: session->new_streams_needed = false; session->closed = true; - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + status = LTTNG_LIVE_ITERATOR_STATUS_END; goto end; case LTTNG_VIEWER_NEW_STREAMS_ERR: BT_LOGE("get_new_streams error"); @@ -1433,22 +1457,32 @@ end: return status; error: - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (lttng_live_is_canceled(lttng_live)) { + status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } return status; } BT_HIDDEN -struct bt_live_viewer_connection * - bt_live_viewer_connection_create(const char *url, FILE *error_fp) +struct live_viewer_connection *live_viewer_connection_create( + const char *url, bool in_query, + struct lttng_live_msg_iter *lttng_live_msg_iter) { - struct bt_live_viewer_connection *viewer_connection; + struct live_viewer_connection *viewer_connection; + + viewer_connection = g_new0(struct live_viewer_connection, 1); - viewer_connection = g_new0(struct bt_live_viewer_connection, 1); + if (bt_socket_init() != 0) { + goto error; + } - bt_object_init(&viewer_connection->obj, connection_release); - viewer_connection->control_sock = -1; + bt_object_init_shared(&viewer_connection->obj, connection_release); + viewer_connection->control_sock = BT_INVALID_SOCKET; viewer_connection->port = -1; - viewer_connection->error_fp = error_fp; + viewer_connection->in_query = in_query; + viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter; viewer_connection->url = g_string_new(url); if (!viewer_connection->url) { goto error; @@ -1469,10 +1503,22 @@ error: } BT_HIDDEN -void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection) +void live_viewer_connection_destroy( + struct live_viewer_connection *viewer_connection) { BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str); lttng_live_disconnect_viewer(viewer_connection); - g_string_free(viewer_connection->url, TRUE); + g_string_free(viewer_connection->url, true); + if (viewer_connection->relay_hostname) { + g_string_free(viewer_connection->relay_hostname, true); + } + if (viewer_connection->target_hostname) { + g_string_free(viewer_connection->target_hostname, true); + } + if (viewer_connection->session_name) { + g_string_free(viewer_connection->session_name, true); + } g_free(viewer_connection); + + bt_socket_fini(); }