ret = 0;
goto end;
}
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n",
+ be32toh(rp.streams_count));
for (i = 0; i < be32toh(rp.streams_count); i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
+ g_free(lvstream);
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
+ g_free(lvstream);
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
}
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ g_free(lvstream);
goto error;
}
-
+ bt_list_add(&lvstream->stream_node,
+ &ctx->session->stream_list);
}
ret = 0;
end:
id = g_array_index(ctx->session_ids, uint64_t, i);
ret = lttng_live_get_new_streams(ctx, id);
printf_verbose("Asking for new streams returns %d\n", ret);
+ if (lttng_live_should_quit()) {
+ ret = -1;
+ goto end;
+ }
if (ret < 0) {
- if (lttng_live_should_quit()) {
- goto end;
- }
if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
printf_verbose("Session %" PRIu64 " closed\n",
id);
nb_streams += ret;
}
}
- ret = nb_streams;
+ if (ctx->session_ids->len == 0) {
+ /* All sessions are closed. */
+ ret = -1;
+ } else {
+ ret = nb_streams;
+ }
end:
return ret;
if (!len_read) {
(void) poll(NULL, 0, ACTIVE_POLL_DELAY);
}
+ if (ret < 0) {
+ break; /* Stop on error. */
+ }
} while (ret > 0 || !len_read);
if (babeltrace_close_memstream(metadata_buf, &size,
goto retry;
case LTTNG_VIEWER_INDEX_HUP:
printf_verbose("get_next_index: stream hung up\n");
+ /* TODO: remove stream from session list and trace ptr array */
viewer_stream->id = -1ULL;
index->offset = EOF;
ctx->session->stream_count--;
ret = 0;
goto end;
}
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n", stream_count);
+
for (i = 0; i < stream_count; i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
+ g_free(lvstream);
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
+ g_free(lvstream);
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
}
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ g_free(lvstream);
goto error;
}
nb_streams++;
-
+ bt_list_add(&lvstream->stream_node,
+ &ctx->session->stream_list);
}
ret = nb_streams;
end:
}
ret = ask_new_streams(ctx);
if (ret < 0) {
+ ret = 0;
goto end_free;
}
if (!ctx->session->stream_count) {