#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
-#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <inttypes.h>
#include <fcntl.h>
-#include <sys/mman.h>
#include <poll.h>
#include <babeltrace/ctf/ctf-index.h>
#include <babeltrace/babeltrace.h>
+#include <babeltrace/endian.h>
#include <babeltrace/ctf/events.h>
#include <babeltrace/ctf/callbacks.h>
#include <babeltrace/ctf/iterator.h>
#include <formats/ctf/events-private.h>
#include <babeltrace/compat/memstream.h>
+#include <babeltrace/compat/send.h>
+#include <babeltrace/compat/string.h>
+#include <babeltrace/compat/mman.h>
#include "lttng-live.h"
#include "lttng-viewer-abi.h"
ssize_t ret;
do {
- ret = send(fd, buf, len, MSG_NOSIGNAL);
+ ret = bt_send_nosigpipe(fd, buf, len);
} while (ret < 0 && errno == EINTR);
return ret;
}
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(ctx->port);
server_addr.sin_addr = *((struct in_addr *) host->h_addr);
- bzero(&(server_addr.sin_zero), 8);
+ memset(&(server_addr.sin_zero), 0, 8);
if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
sizeof(struct sockaddr)) == -1) {
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_connect connect;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+ char cmd_buf[cmd_buf_len];
int ret;
ssize_t ret_len;
}
cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
- cmd.data_size = sizeof(connect);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(connect));
+ cmd.cmd_version = htobe32(0);
connect.viewer_session_id = -1ULL; /* will be set on recv */
connect.major = htobe32(LTTNG_LIVE_MAJOR);
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
- ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending version");
+ perror("[error] Error sending cmd for establishing session");
goto error;
}
- assert(ret_len == sizeof(connect));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect));
if (ret_len == 0) {
for (i = 0; i < session_list->len; i++) {
relay_session = g_ptr_array_index(session_list, i);
- fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+ fprintf(LTTNG_LIVE_OUTPUT_FP, "%s/host/%s/%s (timer = %u, "
"%u stream(s), %u client(s) connected)\n",
path, relay_session->hostname,
relay_session->name, relay_session->timer,
for (i = 0; i < session_list->len; i++) {
relay_session = g_ptr_array_index(session_list, i);
- if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+ if ((strncmp(relay_session->hostname, hostname, MAXNAMLEN) == 0) &&
strncmp(relay_session->name,
- session_name, NAME_MAX) == 0) {
+ session_name, MAXNAMLEN) == 0) {
relay_session->streams += streams;
if (relay_session->clients < clients)
relay_session->clients = clients;
return;
relay_session = g_new0(struct lttng_live_relay_session, 1);
- relay_session->hostname = strndup(hostname, NAME_MAX);
- relay_session->name = strndup(session_name, NAME_MAX);
+ relay_session->hostname = bt_strndup(hostname, MAXNAMLEN);
+ relay_session->name = bt_strndup(session_name, MAXNAMLEN);
relay_session->clients = clients;
relay_session->streams = streams;
relay_session->timer = timer;
}
cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
- cmd.data_size = 0;
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) 0);
+ cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
be32toh(lsession.live_timer));
} else {
if ((strncmp(lsession.session_name, ctx->session_name,
- NAME_MAX) == 0) && (strncmp(lsession.hostname,
- ctx->traced_hostname, NAME_MAX) == 0)) {
+ MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
+ ctx->traced_hostname, MAXNAMLEN) == 0)) {
printf_verbose("Reading from session %" PRIu64 "\n",
session_id);
g_array_append_val(ctx->session_ids,
struct lttng_viewer_attach_session_request rq;
struct lttng_viewer_attach_session_response rp;
struct lttng_viewer_stream stream;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
int ret, i;
ssize_t ret_len;
}
cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(id);
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending attach request");
+ perror("[error] Error sending attach command and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_packet rq;
struct lttng_viewer_trace_packet rp;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
ssize_t ret_len;
int ret;
ret = -1;
goto end;
}
+
cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->id);
rq.offset = htobe64(offset);
rq.len = htobe32(len);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending get_data_packet request");
+ perror("[error] Error sending get_data_packet cmd and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
struct lttng_viewer_metadata_packet rp;
char *data = NULL;
ssize_t ret_len;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (lttng_live_should_quit()) {
ret = -1;
rq.stream_id = htobe64(metadata_stream->id);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending get_metadata request");
+ perror("[error] Error sending get_metadata cmd and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
}
} while (ret > 0 || !len_read);
- if (fclose(metadata_stream->metadata_fp_write))
- perror("fclose");
+ if (babeltrace_close_memstream(metadata_buf, &size,
+ metadata_stream->metadata_fp_write)) {
+ perror("babeltrace_close_memstream");
+ }
metadata_stream->metadata_fp_write = NULL;
error:
int ret;
ssize_t ret_len;
struct lttng_viewer_index *rp = &viewer_stream->current_index;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(viewer_stream->id);
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
retry:
if (lttng_live_should_quit()) {
ret = -1;
goto end;
}
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
-
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, &cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending get_next_index request");
+ perror("[error] Error sending get_next_index cmd and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, rp, sizeof(*rp));
if (ret_len == 0) {
switch (be32toh(rp->status)) {
case LTTNG_VIEWER_INDEX_INACTIVE:
printf_verbose("get_next_index: inactive\n");
+
+ if (index->ts_cycles.timestamp_end ==
+ be64toh(rp->timestamp_end)) {
+ /* Already seen this timestamp. */
+ (void) poll(NULL, 0, ACTIVE_POLL_DELAY);
+ }
+
memset(index, 0, sizeof(struct packet_index));
index->ts_cycles.timestamp_end = be64toh(rp->timestamp_end);
*stream_id = be64toh(rp->stream_id);
struct ctf_stream_pos *pos,
struct ctf_file_stream *file_stream)
{
- int ret;
+ int ret = 0;
switch (whence) {
case SEEK_CUR:
cur_index->ts_real.timestamp_begin;
}
+ /*
+ * Flush the output between attempts to grab a packet, thus
+ * ensuring we flush at least at the periodical timer period.
+ * This ensures the output remains reactive for interactive users and
+ * that the output is flushed when redirected to a file by the shell.
+ */
+ if (fflush(LTTNG_LIVE_OUTPUT_FP) < 0) {
+ perror("fflush");
+ goto end;
+ }
+
if (pos->packet_size == 0 || pos->offset == EOF) {
goto end;
}
}
cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
- cmd.data_size = 0;
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) 0);
+ cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
int ret, i, nb_streams = 0;
ssize_t ret_len;
uint32_t stream_count;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (lttng_live_should_quit()) {
ret = -1;
}
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(id);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending get_new_streams request");
+ perror("[error] Error sending get_new_streams cmd and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {