if (!trace) {
trace = g_new0(struct lttng_live_ctf_trace, 1);
trace->ctf_trace_id = ctf_trace_id;
- trace->streams = g_ptr_array_new();
+ printf_verbose("Create trace ctf_trace_id %" PRIu64 "\n", ctf_trace_id);
+ BT_INIT_LIST_HEAD(&trace->stream_list);
g_hash_table_insert(stream->session->ctf_traces,
&trace->ctf_trace_id,
trace);
if (stream->metadata_flag)
trace->metadata_stream = stream;
+ assert(!stream->in_trace);
+ stream->in_trace = 1;
stream->ctf_trace = trace;
- g_ptr_array_add(trace->streams, stream);
+ bt_list_add(&stream->trace_stream_node, &trace->stream_list);
return ret;
}
ret = 0;
goto end;
}
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n",
+ be32toh(rp.streams_count));
for (i = 0; i < be32toh(rp.streams_count); i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
+ g_free(lvstream);
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
+ g_free(lvstream);
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
}
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ g_free(lvstream);
goto error;
}
-
+ bt_list_add(&lvstream->session_stream_node,
+ &ctx->session->stream_list);
}
ret = 0;
end:
nb_streams += ret;
}
}
- ret = nb_streams;
+ if (ctx->session_ids->len == 0) {
+ /* All sessions are closed. */
+ ret = -1;
+ } else {
+ ret = nb_streams;
+ }
end:
return ret;
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
+ printf_verbose("get_metadata for trace_id: %d, ctf_trace_id: %" PRIu64 "\n",
+ metadata_stream->ctf_trace->trace_id,
+ metadata_stream->ctf_trace->ctf_trace_id);
ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
perror("[error] Error sending get_metadata cmd and request");
if (!len_read) {
(void) poll(NULL, 0, ACTIVE_POLL_DELAY);
}
+ if (ret < 0) {
+ break; /* Stop on error. */
+ }
} while (ret > 0 || !len_read);
if (babeltrace_close_memstream(metadata_buf, &size,
viewer_stream->id = -1ULL;
index->offset = EOF;
ctx->session->stream_count--;
+ viewer_stream->in_trace = 0;
+ bt_list_del(&viewer_stream->trace_stream_node);
+ bt_list_del(&viewer_stream->session_stream_node);
+ g_free(viewer_stream);
+ *stream_id = be64toh(rp->stream_id);
break;
case LTTNG_VIEWER_INDEX_ERR:
fprintf(stderr, "[error] get_next_index: error\n");
struct bt_context *bt_ctx = user_data;
struct lttng_live_ctf_trace *trace = value;
int ret;
+ struct lttng_live_viewer_stream *lvstream, *tmp;
- ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
- if (ret < 0)
- fprintf(stderr, "[error] removing trace from context\n");
+ /*
+ * We don't have ownership of the live viewer stream, just
+ * remove them from our list.
+ */
+ bt_list_for_each_entry_safe(lvstream, tmp, &trace->stream_list,
+ trace_stream_node) {
+ lvstream->in_trace = 0;
+ bt_list_del(&lvstream->trace_stream_node);
+ }
+ if (trace->in_use) {
+ ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
+ if (ret < 0)
+ fprintf(stderr, "[error] removing trace from context\n");
+ }
/* remove the key/value pair from the HT. */
return 1;
int add_one_trace(struct lttng_live_ctx *ctx,
struct lttng_live_ctf_trace *trace)
{
- int i, ret;
+ int ret;
struct bt_context *bt_ctx = ctx->bt_ctx;
struct lttng_live_viewer_stream *stream;
struct bt_mmap_stream *new_mmap_stream;
struct bt_trace_descriptor *td;
struct bt_trace_handle *handle;
+ printf_verbose("Add one trace ctf_trace_id: %" PRIu64
+ " (metadata_stream: %p)\n",
+ trace->ctf_trace_id, trace->metadata_stream);
/*
* We don't know how many streams we will receive for a trace, so
* once we are done receiving the traces, we add all the traces
* If a trace is already in the context, we just skip this function.
*/
if (trace->in_use) {
+ printf_verbose("Trace already in use\n");
ret = 0;
goto end;
}
BT_INIT_LIST_HEAD(&mmap_list.head);
- for (i = 0; i < trace->streams->len; i++) {
- stream = g_ptr_array_index(trace->streams, i);
-
+ bt_list_for_each_entry(stream, &trace->stream_list, trace_stream_node) {
if (!stream->metadata_flag) {
new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
new_mmap_stream->priv = (void *) stream;
goto end_free;
}
+ printf_verbose("Metadata stream found\n");
trace->metadata_fp = babeltrace_fmemopen(metadata_buf,
stream->metadata_len, "rb");
if (!trace->metadata_fp) {
trace->trace_id = ret;
trace->in_use = 1;
+ printf_verbose("Trace now in use, id = %d\n", trace->trace_id);
goto end;
gpointer key;
gpointer value;
+ printf_verbose("Begin add traces\n");
g_hash_table_iter_init(&it, ctx->session->ctf_traces);
while (g_hash_table_iter_next(&it, &key, &value)) {
trace = (struct lttng_live_ctf_trace *) value;
ret = 0;
end:
+ printf_verbose("End add traces\n");
return ret;
}
ret = 0;
goto end;
}
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n", stream_count);
+
for (i = 0; i < stream_count; i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
+ g_free(lvstream);
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
+ g_free(lvstream);
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
}
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ g_free(lvstream);
goto error;
}
nb_streams++;
-
+ bt_list_add(&lvstream->session_stream_node,
+ &ctx->session->stream_list);
}
ret = nb_streams;
end:
fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
if (!fmt_write) {
fprintf(stderr, "[error] ctf-text error\n");
- goto end;
+ goto end_free;
}
td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
if (!iter) {
if (lttng_live_should_quit()) {
ret = 0;
- goto end;
+ goto end_free;
}
fprintf(stderr, "[error] Iterator creation error\n");
- goto end;
+ goto end_free;
}
for (;;) {
if (lttng_live_should_quit()) {
}
end_free:
+ g_hash_table_foreach_remove(ctx->session->ctf_traces,
+ del_traces, ctx->bt_ctx);
bt_context_put(ctx->bt_ctx);
end:
if (lttng_live_should_quit()) {