#include <unistd.h>
#include <glib.h>
#include <inttypes.h>
-#include <sys/socket.h>
#include <sys/types.h>
-#include <netinet/in.h>
-#include <netdb.h>
#include <fcntl.h>
-#include <poll.h>
-#include <babeltrace/compat/send-internal.h>
+#include <babeltrace/compat/socket-internal.h>
+#include <babeltrace/endian-internal.h>
#include <babeltrace/compiler-internal.h>
#include <babeltrace/common-internal.h>
-#include <babeltrace/graph/graph.h>
+#include <babeltrace/babeltrace.h>
#include "lttng-live-internal.h"
#include "viewer-connection.h"
size_t copied = 0, to_copy = len;
struct lttng_live_component *lttng_live =
viewer_connection->lttng_live;
- int fd = viewer_connection->control_sock;
+ BT_SOCKET sock = viewer_connection->control_sock;
do {
- ret = recv(fd, buf + copied, to_copy, 0);
+ ret = bt_socket_recv(sock, buf + copied, to_copy, 0);
if (ret > 0) {
- assert(ret <= to_copy);
+ BT_ASSERT(ret <= to_copy);
copied += ret;
to_copy -= ret;
}
- if (ret < 0 && errno == EINTR) {
+ if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
if (lttng_live_is_canceled(lttng_live)) {
break;
} else {
} while (ret > 0 && to_copy > 0);
if (ret > 0)
ret = copied;
- /* ret = 0 means orderly shutdown, ret < 0 is error. */
+ /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */
return ret;
}
{
struct lttng_live_component *lttng_live =
viewer_connection->lttng_live;
- int fd = viewer_connection->control_sock;
+ BT_SOCKET sock = viewer_connection->control_sock;
ssize_t ret;
for (;;) {
- ret = bt_send_nosigpipe(fd, buf, len);
- if (ret < 0 && errno == EINTR) {
+ ret = bt_socket_send_nosigpipe(sock, buf, len);
+ if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
if (lttng_live_is_canceled(lttng_live)) {
break;
} else {
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_connect connect;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+ char cmd_buf[cmd_buf_len];
int ret;
ssize_t ret_len;
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending version: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
- if (ret_len < 0) {
- BT_LOGE("Error sending version: %s", strerror(errno));
- goto error;
- }
- assert(ret_len == sizeof(connect));
+ BT_ASSERT(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving version: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving version: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(connect));
+ BT_ASSERT(ret_len == sizeof(connect));
BT_LOGD("Received viewer session ID : %" PRIu64,
- be64toh(connect.viewer_session_id));
+ (uint64_t) be64toh(connect.viewer_session_id));
BT_LOGD("Relayd version : %u.%u", be32toh(connect.major),
be32toh(connect.minor));
goto error;
}
- if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
- BT_LOGE("Socket creation failed: %s", strerror(errno));
+ if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
+ BT_LOGE("Socket creation failed: %s", bt_socket_errormsg());
goto error;
}
memset(&(server_addr.sin_zero), 0, 8);
if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
- sizeof(struct sockaddr)) == -1) {
- BT_LOGE("Connection failed: %s", strerror(errno));
+ sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
+ BT_LOGE("Connection failed: %s", bt_socket_errormsg());
goto error;
}
if (lttng_live_handshake(viewer_connection)) {
return ret;
error:
- if (viewer_connection->control_sock >= 0) {
- if (close(viewer_connection->control_sock)) {
- BT_LOGE("Close: %s", strerror(errno));
+ if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
+ if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
+ BT_LOGE("Close: %s", bt_socket_errormsg());
}
}
- viewer_connection->control_sock = -1;
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
return -1;
}
static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
{
- if (viewer_connection->control_sock < 0) {
+ if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
return;
}
- if (close(viewer_connection->control_sock)) {
- BT_LOGE("Close: %s", strerror(errno));
- viewer_connection->control_sock = -1;
+ if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
+ BT_LOGE("Close: %s", bt_socket_errormsg());
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
}
}
int i, len;
bool found = false;
- len = bt_value_array_size(results);
+ len = bt_value_array_get_size(results);
if (len < 0) {
ret = BT_VALUE_STATUS_ERROR;
goto end;
if (ret != BT_VALUE_STATUS_OK) {
goto end;
}
- BT_PUT(btval);
+ BT_OBJECT_PUT_REF_AND_RESET(btval);
btval = bt_value_map_get(map, "client-count");
if (!btval) {
if (ret != BT_VALUE_STATUS_OK) {
goto end;
}
- BT_PUT(btval);
+ BT_OBJECT_PUT_REF_AND_RESET(btval);
}
- BT_PUT(hostname);
- BT_PUT(session_name);
- BT_PUT(map);
+ BT_OBJECT_PUT_REF_AND_RESET(hostname);
+ BT_OBJECT_PUT_REF_AND_RESET(session_name);
+ BT_OBJECT_PUT_REF_AND_RESET(map);
if (found) {
break;
}
}
end:
- BT_PUT(btval);
- BT_PUT(hostname);
- BT_PUT(session_name);
- BT_PUT(map);
+ BT_OBJECT_PUT_REF_AND_RESET(btval);
+ BT_OBJECT_PUT_REF_AND_RESET(hostname);
+ BT_OBJECT_PUT_REF_AND_RESET(session_name);
+ BT_OBJECT_PUT_REF_AND_RESET(map);
*_found = found;
return ret;
}
g_string_append_c(url, '/');
g_string_append(url, session->session_name);
- ret = bt_value_map_insert_string(map, "url", url->str);
+ ret = bt_value_map_insert_string_entry(map, "url", url->str);
if (ret != BT_VALUE_STATUS_OK) {
goto end;
}
* key = "target-hostname",
* value = <string>,
*/
- ret = bt_value_map_insert_string(map, "target-hostname",
+ ret = bt_value_map_insert_string_entry(map, "target-hostname",
session->hostname);
if (ret != BT_VALUE_STATUS_OK) {
goto end;
* key = "session-name",
* value = <string>,
*/
- ret = bt_value_map_insert_string(map, "session-name",
+ ret = bt_value_map_insert_string_entry(map, "session-name",
session->session_name);
if (ret != BT_VALUE_STATUS_OK) {
goto end;
{
uint32_t live_timer = be32toh(session->live_timer);
- ret = bt_value_map_insert_integer(map, "timer-us",
+ ret = bt_value_map_insert_integer_entry(map, "timer-us",
live_timer);
if (ret != BT_VALUE_STATUS_OK) {
goto end;
{
uint32_t streams = be32toh(session->streams);
- ret = bt_value_map_insert_integer(map, "stream-count",
+ ret = bt_value_map_insert_integer_entry(map, "stream-count",
streams);
if (ret != BT_VALUE_STATUS_OK) {
goto end;
{
uint32_t clients = be32toh(session->clients);
- ret = bt_value_map_insert_integer(map, "client-count",
+ ret = bt_value_map_insert_integer_entry(map, "client-count",
clients);
if (ret != BT_VALUE_STATUS_OK) {
goto end;
}
}
- ret = bt_value_array_append(results, map);
+ ret = bt_value_array_append_element(results, map);
end:
if (url) {
g_string_free(url, TRUE);
}
- BT_PUT(map);
+ BT_OBJECT_PUT_REF_AND_RESET(map);
return ret;
}
cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(cmd));
+ BT_ASSERT(ret_len == sizeof(cmd));
ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving session list: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(list));
+ BT_ASSERT(ret_len == sizeof(list));
sessions_count = be32toh(list.sessions_count);
for (i = 0; i < sessions_count; i++) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving session: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(lsession));
+ BT_ASSERT(ret_len == sizeof(lsession));
lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
if (list_append_session(results,
}
goto end;
error:
- BT_PUT(results);
+ BT_OBJECT_PUT_REF_AND_RESET(results);
end:
return results;
}
cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(cmd));
+ BT_ASSERT(ret_len == sizeof(cmd));
ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving session list: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(list));
+ BT_ASSERT(ret_len == sizeof(list));
sessions_count = be32toh(list.sessions_count);
for (i = 0; i < sessions_count; i++) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving session: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(lsession));
+ BT_ASSERT(ret_len == sizeof(lsession));
lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
session_id = be64toh(lsession.id);
if ((strncmp(lsession.session_name,
viewer_connection->session_name->str,
- MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
+ LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
viewer_connection->target_hostname->str,
- MAXNAMLEN) == 0)) {
+ LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
if (lttng_live_add_session(lttng_live, session_id,
lsession.hostname,
lsession.session_name)) {
cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(cmd));
+ BT_ASSERT(ret_len == sizeof(cmd));
ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving create session reply: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(resp));
+ BT_ASSERT(ret_len == sizeof(resp));
if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
BT_LOGE("Error creating viewer session");
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
+ if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error receiving stream");
goto error;
}
- assert(ret_len == sizeof(stream));
+ BT_ASSERT(ret_len == sizeof(stream));
stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
stream_id = be64toh(stream.id);
lttng_live->viewer_connection;
uint64_t session_id = session->id;
uint32_t streams_count;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (session->attached) {
return 0;
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
- goto error;
- }
- assert(ret_len == sizeof(cmd));
-
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending attach request: %s", strerror(errno));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ BT_ASSERT(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving attach response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rp));
+ BT_ASSERT(ret_len == sizeof(rp));
streams_count = be32toh(rp.streams_count);
switch(be32toh(rp.status)) {
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
uint64_t session_id = session->id;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (!session->attached) {
return 0;
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session_id);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
- goto error;
- }
- assert(ret_len == sizeof(cmd));
-
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending detach request: %s", strerror(errno));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ BT_ASSERT(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving detach response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rp));
+ BT_ASSERT(ret_len == sizeof(rp));
switch(be32toh(rp.status)) {
case LTTNG_VIEWER_DETACH_SESSION_OK:
struct lttng_live_metadata *metadata = trace->metadata;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
rq.stream_id = htobe64(metadata->stream_id);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
- goto error;
- }
- assert(ret_len == sizeof(cmd));
-
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending get_metadata request: %s", strerror(errno));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ BT_ASSERT(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving get_metadata response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rp));
+ BT_ASSERT(ret_len == sizeof(rp));
switch (be32toh(rp.status)) {
case LTTNG_VIEWER_METADATA_OK:
BT_LOGI("Remote side has closed connection");
goto error_free_data;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving trace packet: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
goto error_free_data;
}
- assert(ret_len == len);
+ BT_ASSERT(ret_len == len);
do {
ret_len = fwrite(data, 1, len, fp);
BT_LOGE("Writing in the metadata fp");
goto error_free_data;
}
- assert(ret_len == len);
+ BT_ASSERT(ret_len == len);
free(data);
ret = len;
end:
void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
struct packet_index *pindex)
{
- assert(lindex);
- assert(pindex);
+ BT_ASSERT(lindex);
+ BT_ASSERT(pindex);
pindex->offset = be64toh(lindex->offset);
pindex->packet_size = be64toh(lindex->packet_size);
}
BT_HIDDEN
-enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
+enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
struct lttng_live_stream_iterator *stream,
struct packet_index *index)
{
ssize_t ret_len;
struct lttng_viewer_index rp;
uint32_t flags, status;
- enum bt_ctf_lttng_live_iterator_status retstatus =
- BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_lttng_live_iterator_status retstatus =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
+
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->viewer_stream_id);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
- goto error;
- }
- assert(ret_len == sizeof(cmd));
-
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending get_next_index request: %s", strerror(errno));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ BT_ASSERT(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving get_next_index response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rp));
+ BT_ASSERT(ret_len == sizeof(rp));
flags = be32toh(rp.flags);
status = be32toh(rp.status);
stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
ctf_stream_class_id = be64toh(rp.stream_id);
if (stream->ctf_stream_class_id != -1ULL) {
- assert(stream->ctf_stream_class_id ==
+ BT_ASSERT(stream->ctf_stream_class_id ==
ctf_stream_class_id);
} else {
stream->ctf_stream_class_id = ctf_stream_class_id;
lttng_index_to_packet_index(&rp, index);
ctf_stream_class_id = be64toh(rp.stream_id);
if (stream->ctf_stream_class_id != -1ULL) {
- assert(stream->ctf_stream_class_id ==
+ BT_ASSERT(stream->ctf_stream_class_id ==
ctf_stream_class_id);
} else {
stream->ctf_stream_class_id = ctf_stream_class_id;
case LTTNG_VIEWER_INDEX_RETRY:
BT_LOGD("get_next_index: retry");
memset(index, 0, sizeof(struct packet_index));
- retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
goto end;
case LTTNG_VIEWER_INDEX_HUP:
BT_LOGD("get_next_index: stream hung up");
memset(index, 0, sizeof(struct packet_index));
index->offset = EOF;
- retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
stream->state = LTTNG_LIVE_STREAM_EOF;
break;
case LTTNG_VIEWER_INDEX_ERR:
error:
if (lttng_live_is_canceled(lttng_live)) {
- retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
return retstatus;
}
BT_HIDDEN
-enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
+enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
uint64_t req_len, uint64_t *recv_len)
{
- enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK;
+ enum bt_notif_iter_medium_status retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_OK;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_packet rq;
struct lttng_viewer_trace_packet rp;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
rq.offset = htobe64(offset);
rq.len = htobe32(req_len);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
- goto error;
- }
- assert(ret_len == sizeof(cmd));
-
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending get_data request: %s", strerror(errno));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ BT_ASSERT(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving get_data response: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving get_data response: %s", bt_socket_errormsg());
goto error;
}
if (ret_len != sizeof(rp)) {
case LTTNG_VIEWER_GET_PACKET_RETRY:
/* Unimplemented by relay daemon */
BT_LOGD("get_data_packet: retry");
- retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
goto end;
case LTTNG_VIEWER_GET_PACKET_ERR:
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
}
if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
| LTTNG_VIEWER_FLAG_NEW_STREAM)) {
- retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
goto end;
}
BT_LOGE("get_data_packet: error");
goto error;
case LTTNG_VIEWER_GET_PACKET_EOF:
- retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF;
+ retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_EOF;
goto end;
default:
BT_LOGE("get_data_packet: unknown");
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
- BT_LOGE("Error receiving trace packet: %s", strerror(errno));
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == req_len);
+ BT_ASSERT(ret_len == req_len);
*recv_len = ret_len;
end:
return retstatus;
error:
if (lttng_live_is_canceled(lttng_live)) {
- retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
} else {
- retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_ERROR;
}
return retstatus;
}
* Request new streams for a session.
*/
BT_HIDDEN
-enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
+enum bt_lttng_live_iterator_status lttng_live_get_new_streams(
struct lttng_live_session *session)
{
- enum bt_ctf_lttng_live_iterator_status status =
- BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_lttng_live_iterator_status status =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_new_streams_request rq;
struct lttng_viewer_new_streams_response rp;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
uint32_t streams_count;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (!session->new_streams_needed) {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
}
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session->id);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- BT_LOGE("Error sending cmd: %s", strerror(errno));
- goto error;
- }
- assert(ret_len == sizeof(cmd));
-
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
- if (ret_len < 0) {
- BT_LOGE("Error sending get_new_streams request: %s", strerror(errno));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+ if (ret_len == BT_SOCKET_ERROR) {
+ BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ BT_ASSERT(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
}
- if (ret_len < 0) {
+ if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error receiving get_new_streams response");
goto error;
}
- assert(ret_len == sizeof(rp));
+ BT_ASSERT(ret_len == sizeof(rp));
streams_count = be32toh(rp.streams_count);
case LTTNG_VIEWER_NEW_STREAMS_HUP:
session->new_streams_needed = false;
session->closed = true;
- status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
goto end;
case LTTNG_VIEWER_NEW_STREAMS_ERR:
BT_LOGE("get_new_streams error");
error:
if (lttng_live_is_canceled(lttng_live)) {
- status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
return status;
}
viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
+ if (bt_socket_init() != 0) {
+ goto error;
+ }
+
bt_object_init(&viewer_connection->obj, connection_release);
- viewer_connection->control_sock = -1;
+ viewer_connection->control_sock = BT_INVALID_SOCKET;
viewer_connection->port = -1;
viewer_connection->lttng_live = lttng_live;
viewer_connection->url = g_string_new(url);
g_string_free(viewer_connection->session_name, TRUE);
}
g_free(viewer_connection);
+
+ bt_socket_fini();
}