#include <formats/ctf/events-private.h>
#include "lttng-live-functions.h"
-#include "lttng-viewer.h"
+#include "lttng-viewer-abi.h"
/*
* Memory allocation zeroed
cmd.data_size = sizeof(connect);
cmd.cmd_version = 0;
+ connect.viewer_session_id = -1ULL; /* will be set on recv */
connect.major = htobe32(LTTNG_LIVE_MAJOR);
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
goto error;
}
assert(ret_len == sizeof(lsession));
+ lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
+ lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
"%u stream(s), %u client(s) connected)\n",
(gpointer) ctf_trace_id);
if (!trace) {
trace = g_new0(struct lttng_live_ctf_trace, 1);
- if (!trace) {
- ret = -1;
- fprintf(stderr, "[error] ctf_trace allocation\n");
- goto error;
- }
trace->ctf_trace_id = ctf_trace_id;
trace->streams = g_ptr_array_new();
g_hash_table_insert(stream->session->ctf_traces,
stream->ctf_trace = trace;
g_ptr_array_add(trace->streams, stream);
-error:
return ret;
}
ctx->session->stream_count);
ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
ctx->session->stream_count);
- if (!ctx->session->streams) {
- ret = -1;
- goto error;
- }
-
for (i = 0; i < be32toh(rp.streams_count); i++) {
do {
ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
goto error;
}
assert(ret_len == sizeof(stream));
+ stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
+ stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
char *path;
path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
- path = mkdtemp(path);
+ if (!path) {
+ perror("strdup");
+ ret = -1;
+ goto error;
+ }
+ if (!mkdtemp(path)) {
+ perror("mkdtemp");
+ free(path);
+ ret = -1;
+ goto error;
+ }
ctx->session->streams[i].metadata_flag = 1;
snprintf(ctx->session->streams[i].path,
sizeof(ctx->session->streams[i].path),
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
if (ret < 0) {
perror("open");
+ free(path);
goto error;
}
ctx->session->streams[i].fd = ret;
+ free(path);
}
ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
be64toh(stream.ctf_trace_id));
ret = ret_len;
goto error;
}
- assert(ret_len == sizeof(rp));
+ if (ret_len != sizeof(rp)) {
+ fprintf(stderr, "[error] get_data_packet: expected %" PRId64
+ ", received %" PRId64 "\n", ret_len,
+ sizeof(rp));
+ ret = -1;
+ goto error;
+ }
rp.flags = be32toh(rp.flags);
case LTTNG_VIEWER_INDEX_INACTIVE:
printf_verbose("get_next_index: inactive\n");
memset(index, 0, sizeof(struct packet_index));
- index->timestamp_end = be64toh(rp.timestamp_end);
+ index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
break;
case LTTNG_VIEWER_INDEX_OK:
printf_verbose("get_next_index: Ok, need metadata update : %u\n",
index->offset = be64toh(rp.offset);
index->packet_size = be64toh(rp.packet_size);
index->content_size = be64toh(rp.content_size);
- index->timestamp_begin = be64toh(rp.timestamp_begin);
- index->timestamp_end = be64toh(rp.timestamp_end);
+ index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
+ index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
index->events_discarded = be64toh(rp.events_discarded);
if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
{
struct ctf_stream_pos *pos;
struct ctf_file_stream *file_stream;
- struct packet_index packet_index;
+ struct packet_index *prev_index = NULL, *cur_index;
struct lttng_live_viewer_stream *viewer_stream;
struct lttng_live_session *session;
int ret;
viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
session = viewer_stream->session;
+ switch (pos->packet_index->len) {
+ case 0:
+ g_array_set_size(pos->packet_index, 1);
+ cur_index = &g_array_index(pos->packet_index,
+ struct packet_index, 0);
+ break;
+ case 1:
+ g_array_set_size(pos->packet_index, 2);
+ prev_index = &g_array_index(pos->packet_index,
+ struct packet_index, 0);
+ cur_index = &g_array_index(pos->packet_index,
+ struct packet_index, 1);
+ break;
+ case 2:
+ g_array_index(pos->packet_index,
+ struct packet_index, 0) =
+ g_array_index(pos->packet_index,
+ struct packet_index, 1);
+ prev_index = &g_array_index(pos->packet_index,
+ struct packet_index, 0);
+ cur_index = &g_array_index(pos->packet_index,
+ struct packet_index, 1);
+ break;
+ default:
+ abort();
+ break;
+ }
printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
- ret = get_next_index(session->ctx, viewer_stream, &packet_index);
+ ret = get_next_index(session->ctx, viewer_stream, cur_index);
if (ret < 0) {
pos->offset = EOF;
fprintf(stderr, "[error] get_next_index failed\n");
return;
}
- pos->packet_size = packet_index.packet_size;
- pos->content_size = packet_index.content_size;
+ pos->packet_size = cur_index->packet_size;
+ pos->content_size = cur_index->content_size;
pos->mmap_base_offset = 0;
- if (packet_index.offset == EOF) {
+ if (cur_index->offset == EOF) {
pos->offset = EOF;
} else {
pos->offset = 0;
}
- if (packet_index.content_size == 0) {
- file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
+ if (cur_index->content_size == 0) {
+ file_stream->parent.cycles_timestamp =
+ cur_index->ts_cycles.timestamp_end;
file_stream->parent.real_timestamp = ctf_get_real_timestamp(
- &file_stream->parent, packet_index.timestamp_end);
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_end);
} else {
- file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
- file_stream->parent.real_timestamp = ctf_get_real_timestamp(
- &file_stream->parent, packet_index.timestamp_begin);
+ /* Convert the timestamps and append to the real_index. */
+ cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_begin);
+ cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_end);
+
+ ctf_update_current_packet_index(&file_stream->parent,
+ prev_index, cur_index);
+
+ file_stream->parent.cycles_timestamp =
+ cur_index->ts_cycles.timestamp_begin;
+ file_stream->parent.real_timestamp =
+ cur_index->ts_real.timestamp_begin;
}
if (pos->packet_size == 0 || pos->offset == EOF) {
printf_verbose("get_data_packet for stream %" PRIu64 "\n",
viewer_stream->id);
ret = get_data_packet(session->ctx, pos, viewer_stream,
- be64toh(packet_index.offset),
- packet_index.packet_size / CHAR_BIT);
+ be64toh(cur_index->offset),
+ cur_index->packet_size / CHAR_BIT);
if (ret == -2) {
goto retry;
} else if (ret < 0) {
printf_verbose("Index received : packet_size : %" PRIu64
", offset %" PRIu64 ", content_size %" PRIu64
", timestamp_end : %" PRIu64 "\n",
- packet_index.packet_size, packet_index.offset,
- packet_index.content_size, packet_index.timestamp_end);
+ cur_index->packet_size, cur_index->offset,
+ cur_index->content_size,
+ cur_index->ts_cycles.timestamp_end);
/* update trace_packet_header and stream_packet_context */
if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {