#include <babeltrace/ctf/events-internal.h>
#include <formats/ctf/events-private.h>
+#include <babeltrace/endian.h>
#include <babeltrace/compat/memstream.h>
#include "lttng-live.h"
static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
size_t index, int whence);
-static void add_traces(gpointer key, gpointer value, gpointer user_data);
+static int add_traces(struct lttng_live_ctx *ctx);
static int del_traces(gpointer key, gpointer value, gpointer user_data);
static int get_new_metadata(struct lttng_live_ctx *ctx,
struct lttng_live_viewer_stream *viewer_stream,
be64toh(connect.viewer_session_id));
printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
be32toh(connect.minor));
+
+ if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
+ fprintf(stderr, "[error] Incompatible lttng-relayd protocol\n");
+ goto error;
+ }
+ /* Use the smallest protocol version implemented. */
+ if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
+ ctx->minor = be32toh(connect.minor);
+ } else {
+ ctx->minor = LTTNG_LIVE_MINOR;
+ }
+ ctx->major = LTTNG_LIVE_MAJOR;
ret = 0;
end:
return ret;
ctx->session->streams[i].id = be64toh(stream.id);
ctx->session->streams[i].session = ctx->session;
- ctx->session->streams[i].first_read = 1;
ctx->session->streams[i].mmap_size = 0;
+ ctx->session->streams[i].ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
ctx->session->streams[i].metadata_flag = 1;
return -1;
}
+/*
+ * Ask the relay for new streams.
+ *
+ * Returns the number of new streams received or a negative value on error.
+ */
static
int ask_new_streams(struct lttng_live_ctx *ctx)
{
- int i, ret = 0;
+ int i, ret = 0, nb_streams = 0;
uint64_t id;
restart:
for (i = 0; i < ctx->session_ids->len; i++) {
id = g_array_index(ctx->session_ids, uint64_t, i);
ret = lttng_live_get_new_streams(ctx, id);
- printf_verbose("Asking for new streams returns %d\n",
- ret);
+ printf_verbose("Asking for new streams returns %d\n", ret);
if (ret < 0) {
+ if (lttng_live_should_quit()) {
+ goto end;
+ }
if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
printf_verbose("Session %" PRIu64 " closed\n",
id);
ret = -1;
goto end;
}
+ } else {
+ nb_streams += ret;
}
}
+ ret = nb_streams;
end:
return ret;
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->id);
- /* Already in big endian. */
- rq.offset = offset;
+ rq.offset = htobe64(offset);
rq.len = htobe32(len);
ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
printf_verbose("get_data_packet: new streams needed\n");
ret = ask_new_streams(ctx);
- if (ret < 0)
+ if (ret < 0) {
goto error;
- g_hash_table_foreach(ctx->session->ctf_traces,
- add_traces, ctx->bt_ctx);
+ } else if (ret > 0) {
+ ret = add_traces(ctx);
+ if (ret < 0) {
+ goto error;
+ }
+ }
}
if (rp.flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
| LTTNG_VIEWER_FLAG_NEW_STREAM)) {
goto error;
}
- if (len <= 0) {
+ if (len == 0) {
goto error;
}
{
int ret = 0;
struct lttng_live_viewer_stream *metadata_stream;
- size_t size, len_read = 0;;
+ size_t size, len_read = 0;
metadata_stream = viewer_stream->ctf_trace->metadata_stream;
if (!metadata_stream) {
}
do {
+ if (lttng_live_should_quit()) {
+ ret = -1;
+ goto error;
+ }
/*
* get_one_metadata_packet returns the number of bytes
* received, 0 when we have received everything, a
return ret;
}
+/*
+ * Assign the fields from a lttng_viewer_index to a packet_index.
+ */
+static
+void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
+ struct packet_index *pindex)
+{
+ assert(lindex);
+ assert(pindex);
+
+ pindex->offset = be64toh(lindex->offset);
+ pindex->packet_size = be64toh(lindex->packet_size);
+ pindex->content_size = be64toh(lindex->content_size);
+ pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
+ pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
+ pindex->events_discarded = be64toh(lindex->events_discarded);
+}
+
/*
* Get one index for a stream.
*
static
int get_next_index(struct lttng_live_ctx *ctx,
struct lttng_live_viewer_stream *viewer_stream,
- struct packet_index *index)
+ struct packet_index *index, uint64_t *stream_id)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_next_index rq;
- struct lttng_viewer_index rp;
int ret;
ssize_t ret_len;
+ struct lttng_viewer_index *rp = &viewer_stream->current_index;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = sizeof(rq);
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(ctx->control_sock, rp, sizeof(*rp));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
goto error;
perror("[error] Error receiving index response");
goto error;
}
- assert(ret_len == sizeof(rp));
+ assert(ret_len == sizeof(*rp));
- rp.flags = be32toh(rp.flags);
+ rp->flags = be32toh(rp->flags);
- switch (be32toh(rp.status)) {
+ switch (be32toh(rp->status)) {
case LTTNG_VIEWER_INDEX_INACTIVE:
printf_verbose("get_next_index: inactive\n");
memset(index, 0, sizeof(struct packet_index));
- index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
+ index->ts_cycles.timestamp_end = be64toh(rp->timestamp_end);
+ *stream_id = be64toh(rp->stream_id);
break;
case LTTNG_VIEWER_INDEX_OK:
printf_verbose("get_next_index: Ok, need metadata update : %u\n",
- rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
- index->offset = be64toh(rp.offset);
- index->packet_size = be64toh(rp.packet_size);
- index->content_size = be64toh(rp.content_size);
- index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
- index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
- index->events_discarded = be64toh(rp.events_discarded);
+ rp->flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
+ lttng_index_to_packet_index(rp, index);
+ *stream_id = be64toh(rp->stream_id);
+ viewer_stream->data_pending = 1;
- if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
+ if (rp->flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
ret = append_metadata(ctx, viewer_stream);
if (ret)
goto error;
}
- if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+ if (rp->flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+ printf_verbose("get_next_index: need new streams\n");
ret = ask_new_streams(ctx);
- if (ret < 0)
+ if (ret < 0) {
goto error;
- g_hash_table_foreach(ctx->session->ctf_traces,
- add_traces, ctx->bt_ctx);
+ } else if (ret > 0) {
+ ret = add_traces(ctx);
+ if (ret < 0) {
+ goto error;
+ }
+ }
}
break;
case LTTNG_VIEWER_INDEX_RETRY:
return -1;
}
+static
+void read_packet_header(struct ctf_stream_pos *pos,
+ struct ctf_file_stream *file_stream)
+{
+ int ret;
+
+ /* update trace_packet_header and stream_packet_context */
+ if (!(pos->prot & PROT_WRITE) &&
+ file_stream->parent.trace_packet_header) {
+ /* Read packet header */
+ ret = generic_rw(&pos->parent,
+ &file_stream->parent.trace_packet_header->p);
+ if (ret) {
+ pos->offset = EOF;
+ fprintf(stderr, "[error] trace packet "
+ "header read failed\n");
+ goto end;
+ }
+ }
+ if (!(pos->prot & PROT_WRITE) &&
+ file_stream->parent.stream_packet_context) {
+ /* Read packet context */
+ ret = generic_rw(&pos->parent,
+ &file_stream->parent.stream_packet_context->p);
+ if (ret) {
+ pos->offset = EOF;
+ fprintf(stderr, "[error] stream packet "
+ "context read failed\n");
+ goto end;
+ }
+ }
+ pos->data_offset = pos->offset;
+
+end:
+ return;
+}
+
+/*
+ * Handle the seek parameters.
+ * Returns 0 if the packet_seek can continue, a positive value to
+ * cleanly exit the packet_seek, a negative value on error.
+ */
+static
+int handle_seek_position(size_t index, int whence,
+ struct lttng_live_viewer_stream *viewer_stream,
+ struct ctf_stream_pos *pos,
+ struct ctf_file_stream *file_stream)
+{
+ int ret;
+
+ switch (whence) {
+ case SEEK_CUR:
+ ret = 0;
+ goto end;
+ case SEEK_SET:
+ /*
+ * We only allow to seek to 0.
+ */
+ if (index != 0) {
+ fprintf(stderr, "[error] Arbitrary seek in lttng-live "
+ "trace not supported\n");
+ pos->offset = EOF;
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+ goto end;
+
+ default:
+ fprintf(stderr, "[error] Invalid seek parameter\n");
+ assert(0);
+ }
+
+end:
+ return ret;
+}
+
static
void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
int whence)
struct packet_index *prev_index = NULL, *cur_index;
struct lttng_live_viewer_stream *viewer_stream;
struct lttng_live_session *session;
+ uint64_t stream_id = -1ULL;
int ret;
-retry:
pos = ctf_pos(stream_pos);
file_stream = container_of(pos, struct ctf_file_stream, pos);
viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
session = viewer_stream->session;
+ ret = handle_seek_position(index, whence, viewer_stream, pos,
+ file_stream);
+ if (ret != 0) {
+ return;
+ }
+
+retry:
switch (pos->packet_index->len) {
case 0:
g_array_set_size(pos->packet_index, 1);
abort();
break;
}
- printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
- ret = get_next_index(session->ctx, viewer_stream, cur_index);
- if (ret < 0) {
- pos->offset = EOF;
- fprintf(stderr, "[error] get_next_index failed\n");
+
+ if (viewer_stream->data_pending) {
+ lttng_index_to_packet_index(&viewer_stream->current_index, cur_index);
+ } else {
+ printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
+ ret = get_next_index(session->ctx, viewer_stream, cur_index, &stream_id);
+ if (ret < 0) {
+ pos->offset = EOF;
+ if (!lttng_live_should_quit()) {
+ fprintf(stderr, "[error] get_next_index failed\n");
+ }
+ return;
+ }
+ printf_verbose("Index received : packet_size : %" PRIu64
+ ", offset %" PRIu64 ", content_size %" PRIu64
+ ", timestamp_end : %" PRIu64 "\n",
+ cur_index->packet_size, cur_index->offset,
+ cur_index->content_size,
+ cur_index->ts_cycles.timestamp_end);
+
+ }
+
+ /*
+ * On the first time we receive an index, the stream_id needs to
+ * be set for the stream in order to use it, we don't want any
+ * data at this stage.
+ */
+ if (file_stream->parent.stream_id == -1ULL) {
+ /*
+ * Warning: with lttng-tools < 2.4.2, the beacon does not
+ * contain the real stream ID, it is memset to 0, so this
+ * might create a problem when a session has multiple
+ * channels. We can't detect it at this stage, lttng-tools
+ * has to be upgraded to fix this problem.
+ */
+ printf_verbose("Assigning stream_id %" PRIu64 "\n",
+ stream_id);
+ file_stream->parent.stream_id = stream_id;
+ viewer_stream->ctf_stream_id = stream_id;
+
return;
}
}
if (cur_index->content_size == 0) {
- file_stream->parent.cycles_timestamp =
+ if (file_stream->parent.stream_class) {
+ file_stream->parent.cycles_timestamp =
cur_index->ts_cycles.timestamp_end;
- file_stream->parent.real_timestamp = ctf_get_real_timestamp(
- &file_stream->parent,
- cur_index->ts_cycles.timestamp_end);
+ file_stream->parent.real_timestamp = ctf_get_real_timestamp(
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_end);
+ }
} else {
- /* Convert the timestamps and append to the real_index. */
- cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
- &file_stream->parent,
- cur_index->ts_cycles.timestamp_begin);
- cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
- &file_stream->parent,
- cur_index->ts_cycles.timestamp_end);
+ if (file_stream->parent.stream_class) {
+ /* Convert the timestamps and append to the real_index. */
+ cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_begin);
+ cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_end);
+ }
ctf_update_current_packet_index(&file_stream->parent,
prev_index, cur_index);
printf_verbose("get_data_packet for stream %" PRIu64 "\n",
viewer_stream->id);
ret = get_data_packet(session->ctx, pos, viewer_stream,
- be64toh(cur_index->offset),
+ cur_index->offset,
cur_index->packet_size / CHAR_BIT);
if (ret == -2) {
goto retry;
} else if (ret < 0) {
pos->offset = EOF;
- fprintf(stderr, "[error] get_data_packet failed\n");
+ if (!lttng_live_should_quit()) {
+ fprintf(stderr, "[error] get_data_packet failed\n");
+ }
return;
}
+ viewer_stream->data_pending = 0;
- printf_verbose("Index received : packet_size : %" PRIu64
- ", offset %" PRIu64 ", content_size %" PRIu64
- ", timestamp_end : %" PRIu64 "\n",
- cur_index->packet_size, cur_index->offset,
- cur_index->content_size,
- cur_index->ts_cycles.timestamp_end);
-
- /* update trace_packet_header and stream_packet_context */
- if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
- /* Read packet header */
- ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
- if (ret) {
- pos->offset = EOF;
- fprintf(stderr, "[error] trace packet header read failed\n");
- goto end;
- }
- }
- if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
- /* Read packet context */
- ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
- if (ret) {
- pos->offset = EOF;
- fprintf(stderr, "[error] stream packet context read failed\n");
- goto end;
- }
- }
- pos->data_offset = pos->offset;
+ read_packet_header(pos, file_stream);
end:
return;
}
static
-void add_traces(gpointer key, gpointer value, gpointer user_data)
+int add_one_trace(struct lttng_live_ctx *ctx,
+ struct lttng_live_ctf_trace *trace)
{
int i, ret;
- struct bt_context *bt_ctx = user_data;
- struct lttng_live_ctf_trace *trace = value;
+ struct bt_context *bt_ctx = ctx->bt_ctx;
struct lttng_live_viewer_stream *stream;
struct bt_mmap_stream *new_mmap_stream;
struct bt_mmap_stream_list mmap_list;
- struct lttng_live_ctx *ctx = NULL;
struct bt_trace_descriptor *td;
struct bt_trace_handle *handle;
* times the same traces.
* If a trace is already in the context, we just skip this function.
*/
- if (trace->in_use)
- return;
+ if (trace->in_use) {
+ ret = 0;
+ goto end;
+ }
BT_INIT_LIST_HEAD(&mmap_list.head);
for (i = 0; i < trace->streams->len; i++) {
stream = g_ptr_array_index(trace->streams, i);
- ctx = stream->session->ctx;
if (!stream->metadata_flag) {
new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
if (!trace->metadata_fp) {
fprintf(stderr, "[error] No metadata stream opened\n");
+ ret = -1;
goto end_free;
}
ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
if (ret < 0) {
fprintf(stderr, "[error] Error adding trace\n");
+ ret = -1;
goto end_free;
}
trace->metadata_stream->metadata_len = 0;
end_free:
bt_context_put(bt_ctx);
end:
- return;
+ return ret;
+}
+
+static
+int add_traces(struct lttng_live_ctx *ctx)
+{
+ int ret;
+ struct lttng_live_ctf_trace *trace;
+ GHashTableIter it;
+ gpointer key;
+ gpointer value;
+
+ g_hash_table_iter_init(&it, ctx->session->ctf_traces);
+ while (g_hash_table_iter_next(&it, &key, &value)) {
+ trace = (struct lttng_live_ctf_trace *) value;
+ ret = add_one_trace(ctx, trace);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+
+ ret = 0;
+
+end:
+ return ret;
}
+/*
+ * Request new streams for a session.
+ * Returns the number of streams received or a negative value on error.
+ */
int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_new_streams_request rq;
struct lttng_viewer_new_streams_response rp;
struct lttng_viewer_stream stream;
- int ret, i;
+ int ret, i, nb_streams = 0;
ssize_t ret_len;
uint32_t stream_count;
ctx->session->streams[i].id = be64toh(stream.id);
ctx->session->streams[i].session = ctx->session;
- ctx->session->streams[i].first_read = 1;
ctx->session->streams[i].mmap_size = 0;
+ ctx->session->streams[i].ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
ctx->session->streams[i].metadata_flag = 1;
if (ret < 0) {
goto error;
}
+ nb_streams++;
}
- ret = 0;
+ ret = nb_streams;
end:
return ret;
return -1;
}
-void lttng_live_read(struct lttng_live_ctx *ctx)
+int lttng_live_read(struct lttng_live_ctx *ctx)
{
- int ret, i;
+ int ret = -1;
+ int i;
struct bt_ctf_iter *iter;
const struct bt_ctf_event *event;
struct bt_iter_pos begin_pos;
int flags;
if (lttng_live_should_quit()) {
+ ret = 0;
goto end_free;
}
while (!ctx->session->stream_count) {
if (lttng_live_should_quit()
|| ctx->session_ids->len == 0) {
+ ret = 0;
goto end_free;
}
ret = ask_new_streams(ctx);
}
}
- g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
- ctx->bt_ctx);
+ ret = add_traces(ctx);
+ if (ret < 0) {
+ goto end_free;
+ }
begin_pos.type = BT_SEEK_BEGIN;
iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
if (!iter) {
+ if (lttng_live_should_quit()) {
+ ret = 0;
+ goto end;
+ }
fprintf(stderr, "[error] Iterator creation error\n");
goto end;
}
for (;;) {
if (lttng_live_should_quit()) {
+ ret = 0;
goto end_free;
}
event = bt_ctf_iter_read_event_flags(iter, &flags);
end_free:
bt_context_put(ctx->bt_ctx);
end:
- return;
+ if (lttng_live_should_quit()) {
+ ret = 0;
+ }
+ return ret;
}