#include <errno.h>
#include <inttypes.h>
#include <fcntl.h>
-#include <sys/mman.h>
#include <poll.h>
#include <babeltrace/ctf/ctf-index.h>
#include <babeltrace/compat/memstream.h>
#include <babeltrace/compat/send.h>
#include <babeltrace/compat/string.h>
+#include <babeltrace/compat/mman.h>
#include "lttng-live.h"
#include "lttng-viewer-abi.h"
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_connect connect;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+ char cmd_buf[cmd_buf_len];
int ret;
ssize_t ret_len;
}
cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
- cmd.data_size = sizeof(connect);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(connect));
+ cmd.cmd_version = htobe32(0);
connect.viewer_session_id = -1ULL; /* will be set on recv */
connect.major = htobe32(LTTNG_LIVE_MAJOR);
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
- ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending version");
+ perror("[error] Error sending cmd for establishing session");
goto error;
}
- assert(ret_len == sizeof(connect));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect));
if (ret_len == 0) {
for (i = 0; i < session_list->len; i++) {
relay_session = g_ptr_array_index(session_list, i);
- fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+ fprintf(LTTNG_LIVE_OUTPUT_FP, "%s/host/%s/%s (timer = %u, "
"%u stream(s), %u client(s) connected)\n",
path, relay_session->hostname,
relay_session->name, relay_session->timer,
}
cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
- cmd.data_size = 0;
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) 0);
+ cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
struct lttng_viewer_attach_session_request rq;
struct lttng_viewer_attach_session_response rp;
struct lttng_viewer_stream stream;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
int ret, i;
ssize_t ret_len;
}
cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(id);
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending attach request");
+ perror("[error] Error sending attach command and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
ret = 0;
goto end;
}
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n",
+ be32toh(rp.streams_count));
for (i = 0; i < be32toh(rp.streams_count); i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
+ g_free(lvstream);
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
+ g_free(lvstream);
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
}
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ g_free(lvstream);
goto error;
}
-
+ bt_list_add(&lvstream->stream_node,
+ &ctx->session->stream_list);
}
ret = 0;
end:
id = g_array_index(ctx->session_ids, uint64_t, i);
ret = lttng_live_get_new_streams(ctx, id);
printf_verbose("Asking for new streams returns %d\n", ret);
+ if (lttng_live_should_quit()) {
+ ret = -1;
+ goto end;
+ }
if (ret < 0) {
- if (lttng_live_should_quit()) {
- goto end;
- }
if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
printf_verbose("Session %" PRIu64 " closed\n",
id);
nb_streams += ret;
}
}
- ret = nb_streams;
+ if (ctx->session_ids->len == 0) {
+ /* All sessions are closed. */
+ ret = -1;
+ } else {
+ ret = nb_streams;
+ }
end:
return ret;
struct lttng_live_viewer_stream *metadata;
char *metadata_buf = NULL;
+ if (!viewer_stream->ctf_trace->handle) {
+ printf_verbose("append_metadata: trace handle not ready yet.\n");
+ return 0;
+ }
+
printf_verbose("get_next_index: new metadata needed\n");
ret = get_new_metadata(ctx, viewer_stream, &metadata_buf);
if (ret < 0) {
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_packet rq;
struct lttng_viewer_trace_packet rp;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
ssize_t ret_len;
int ret;
ret = -1;
goto end;
}
+
cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->id);
rq.offset = htobe64(offset);
rq.len = htobe32(len);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending get_data_packet request");
+ perror("[error] Error sending get_data_packet cmd and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
struct lttng_viewer_metadata_packet rp;
char *data = NULL;
ssize_t ret_len;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (lttng_live_should_quit()) {
ret = -1;
rq.stream_id = htobe64(metadata_stream->id);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending get_metadata request");
+ perror("[error] Error sending get_metadata cmd and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
if (!len_read) {
(void) poll(NULL, 0, ACTIVE_POLL_DELAY);
}
+ if (ret < 0) {
+ break; /* Stop on error. */
+ }
} while (ret > 0 || !len_read);
if (babeltrace_close_memstream(metadata_buf, &size,
int ret;
ssize_t ret_len;
struct lttng_viewer_index *rp = &viewer_stream->current_index;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(viewer_stream->id);
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
retry:
if (lttng_live_should_quit()) {
ret = -1;
goto end;
}
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
-
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, &cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending get_next_index request");
+ perror("[error] Error sending get_next_index cmd and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, rp, sizeof(*rp));
if (ret_len == 0) {
switch (be32toh(rp->status)) {
case LTTNG_VIEWER_INDEX_INACTIVE:
printf_verbose("get_next_index: inactive\n");
+
+ if (index->ts_cycles.timestamp_end ==
+ be64toh(rp->timestamp_end)) {
+ /* Already seen this timestamp. */
+ (void) poll(NULL, 0, ACTIVE_POLL_DELAY);
+ }
+
memset(index, 0, sizeof(struct packet_index));
index->ts_cycles.timestamp_end = be64toh(rp->timestamp_end);
*stream_id = be64toh(rp->stream_id);
goto retry;
case LTTNG_VIEWER_INDEX_HUP:
printf_verbose("get_next_index: stream hung up\n");
+ /* TODO: remove stream from session list and trace ptr array */
viewer_stream->id = -1ULL;
index->offset = EOF;
ctx->session->stream_count--;
}
if (cur_index->content_size == 0) {
+ /* Beacon packet index */
if (file_stream->parent.stream_class) {
file_stream->parent.cycles_timestamp =
cur_index->ts_cycles.timestamp_end;
file_stream->parent.real_timestamp = ctf_get_real_timestamp(
&file_stream->parent,
cur_index->ts_cycles.timestamp_end);
+
+ /*
+ * Duplicate the data from the previous index, because
+ * the one we just received is only a beacon with no
+ * relevant information except the timestamp_end. We
+ * don't need to keep this timestamp_end because we already
+ * updated the file_stream timestamps, so we only need
+ * to keep the last real index data as prev_index. That
+ * way, we keep the original prev timestamps and
+ * discarded events counter. This is the same behaviour
+ * as if we were reading a local trace, we would not
+ * have fake indexes between real indexes.
+ */
+ memcpy(cur_index, prev_index, sizeof(struct packet_index));
}
} else {
+ /* Real packet index */
if (file_stream->parent.stream_class) {
/* Convert the timestamps and append to the real_index. */
cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
ctf_update_current_packet_index(&file_stream->parent,
prev_index, cur_index);
+ /*
+ * We need to check if we are in trace read or called
+ * from packet indexing. In this last case, the
+ * collection is not there, so we cannot print the
+ * timestamps.
+ */
+ if ((&file_stream->parent)->stream_class->trace->parent.collection) {
+ ctf_print_discarded_lost(stderr, &file_stream->parent);
+ }
+
file_stream->parent.cycles_timestamp =
cur_index->ts_cycles.timestamp_begin;
file_stream->parent.real_timestamp =
cur_index->ts_real.timestamp_begin;
}
+ /*
+ * Flush the output between attempts to grab a packet, thus
+ * ensuring we flush at least at the periodical timer period.
+ * This ensures the output remains reactive for interactive users and
+ * that the output is flushed when redirected to a file by the shell.
+ */
+ if (fflush(LTTNG_LIVE_OUTPUT_FP) < 0) {
+ perror("fflush");
+ goto end;
+ }
+
if (pos->packet_size == 0 || pos->offset == EOF) {
goto end;
}
}
cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
- cmd.data_size = 0;
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) 0);
+ cmd.cmd_version = htobe32(0);
ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
int ret, i, nb_streams = 0;
ssize_t ret_len;
uint32_t stream_count;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (lttng_live_should_quit()) {
ret = -1;
}
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
- cmd.data_size = sizeof(rq);
- cmd.cmd_version = 0;
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(id);
- ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
- if (ret_len < 0) {
- perror("[error] Error sending cmd");
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
- perror("[error] Error sending get_new_streams request");
+ perror("[error] Error sending get_new_streams cmd and request");
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
ret = 0;
goto end;
}
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n", stream_count);
+
for (i = 0; i < stream_count; i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
+ g_free(lvstream);
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
+ g_free(lvstream);
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
}
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ g_free(lvstream);
goto error;
}
nb_streams++;
-
+ bt_list_add(&lvstream->stream_node,
+ &ctx->session->stream_list);
}
ret = nb_streams;
end:
}
ret = ask_new_streams(ctx);
if (ret < 0) {
+ ret = 0;
goto end_free;
}
if (!ctx->session->stream_count) {