return -1;
}
+/*
+ * Ask the relay for new streams.
+ *
+ * Returns the number of new streams received or a negative value on error.
+ */
static
int ask_new_streams(struct lttng_live_ctx *ctx)
{
- int i, ret = 0;
+ int i, ret = 0, nb_streams = 0;
uint64_t id;
restart:
for (i = 0; i < ctx->session_ids->len; i++) {
id = g_array_index(ctx->session_ids, uint64_t, i);
ret = lttng_live_get_new_streams(ctx, id);
- printf_verbose("Asking for new streams returns %d\n",
- ret);
+ printf_verbose("Asking for new streams returns %d\n", ret);
if (ret < 0) {
+ if (lttng_live_should_quit()) {
+ goto end;
+ }
if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
printf_verbose("Session %" PRIu64 " closed\n",
id);
ret = -1;
goto end;
}
+ } else {
+ nb_streams += ret;
}
}
+ ret = nb_streams;
end:
return ret;
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->id);
- /* Already in big endian. */
- rq.offset = offset;
+ rq.offset = htobe64(offset);
rq.len = htobe32(len);
ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
ret = ask_new_streams(ctx);
if (ret < 0)
goto error;
- g_hash_table_foreach(ctx->session->ctf_traces,
- add_traces, ctx->bt_ctx);
+ else if (ret > 0)
+ g_hash_table_foreach(ctx->session->ctf_traces,
+ add_traces, ctx->bt_ctx);
}
if (rp.flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
| LTTNG_VIEWER_FLAG_NEW_STREAM)) {
goto error;
}
- if (len <= 0) {
+ if (len == 0) {
goto error;
}
{
int ret = 0;
struct lttng_live_viewer_stream *metadata_stream;
- size_t size, len_read = 0;;
+ size_t size, len_read = 0;
metadata_stream = viewer_stream->ctf_trace->metadata_stream;
if (!metadata_stream) {
goto error;
}
if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+ printf_verbose("get_next_index: need new streams\n");
ret = ask_new_streams(ctx);
if (ret < 0)
goto error;
- g_hash_table_foreach(ctx->session->ctf_traces,
- add_traces, ctx->bt_ctx);
+ else if (ret > 0)
+ g_hash_table_foreach(ctx->session->ctf_traces,
+ add_traces, ctx->bt_ctx);
}
break;
case LTTNG_VIEWER_INDEX_RETRY:
ret = get_next_index(session->ctx, viewer_stream, cur_index);
if (ret < 0) {
pos->offset = EOF;
- fprintf(stderr, "[error] get_next_index failed\n");
+ if (!lttng_live_should_quit()) {
+ fprintf(stderr, "[error] get_next_index failed\n");
+ }
return;
}
printf_verbose("get_data_packet for stream %" PRIu64 "\n",
viewer_stream->id);
ret = get_data_packet(session->ctx, pos, viewer_stream,
- be64toh(cur_index->offset),
+ cur_index->offset,
cur_index->packet_size / CHAR_BIT);
if (ret == -2) {
goto retry;
} else if (ret < 0) {
pos->offset = EOF;
- fprintf(stderr, "[error] get_data_packet failed\n");
+ if (!lttng_live_should_quit()) {
+ fprintf(stderr, "[error] get_data_packet failed\n");
+ }
return;
}
return;
}
+/*
+ * Request new streams for a session.
+ * Returns the number of streams received or a negative value on error.
+ */
int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_new_streams_request rq;
struct lttng_viewer_new_streams_response rp;
struct lttng_viewer_stream stream;
- int ret, i;
+ int ret, i, nb_streams = 0;
ssize_t ret_len;
uint32_t stream_count;
if (ret < 0) {
goto error;
}
+ nb_streams++;
}
- ret = 0;
+ ret = nb_streams;
end:
return ret;
return -1;
}
-void lttng_live_read(struct lttng_live_ctx *ctx)
+int lttng_live_read(struct lttng_live_ctx *ctx)
{
- int ret, i;
+ int ret = -1;
+ int i;
struct bt_ctf_iter *iter;
const struct bt_ctf_event *event;
struct bt_iter_pos begin_pos;
int flags;
if (lttng_live_should_quit()) {
+ ret = 0;
goto end_free;
}
while (!ctx->session->stream_count) {
if (lttng_live_should_quit()
|| ctx->session_ids->len == 0) {
+ ret = 0;
goto end_free;
}
ret = ask_new_streams(ctx);
begin_pos.type = BT_SEEK_BEGIN;
iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
if (!iter) {
+ if (lttng_live_should_quit()) {
+ ret = 0;
+ goto end;
+ }
fprintf(stderr, "[error] Iterator creation error\n");
goto end;
}
for (;;) {
if (lttng_live_should_quit()) {
+ ret = 0;
goto end_free;
}
event = bt_ctf_iter_read_event_flags(iter, &flags);
end_free:
bt_context_put(ctx->bt_ctx);
end:
- return;
+ if (lttng_live_should_quit()) {
+ ret = 0;
+ }
+ return ret;
}