X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=formats%2Flttng-live%2Flttng-live-comm.c;h=96817f5e170d65e02d310d0e9abc0bc224283c9f;hb=a4e7565e5f77f914eab073310895a73096276907;hp=0e51c0c2ab5b1f2ab8519bb94e67e7305199e5b8;hpb=917bba7f2b1af963cccffcbde225ebd07f27068f;p=babeltrace.git diff --git a/formats/lttng-live/lttng-live-comm.c b/formats/lttng-live/lttng-live-comm.c index 0e51c0c2..96817f5e 100644 --- a/formats/lttng-live/lttng-live-comm.c +++ b/formats/lttng-live/lttng-live-comm.c @@ -390,6 +390,7 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream, if (!trace) { trace = g_new0(struct lttng_live_ctf_trace, 1); trace->ctf_trace_id = ctf_trace_id; + trace->trace_id = -1; printf_verbose("Create trace ctf_trace_id %" PRIu64 "\n", ctf_trace_id); BT_INIT_LIST_HEAD(&trace->stream_list); g_hash_table_insert(stream->session->ctf_traces, @@ -1107,8 +1108,8 @@ retry: viewer_stream->in_trace = 0; bt_list_del(&viewer_stream->trace_stream_node); bt_list_del(&viewer_stream->session_stream_node); - g_free(viewer_stream); *stream_id = be64toh(rp->stream_id); + g_free(viewer_stream); break; case LTTNG_VIEWER_INDEX_ERR: fprintf(stderr, "[error] get_next_index: error\n"); @@ -1223,7 +1224,8 @@ void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, ret = handle_seek_position(index, whence, viewer_stream, pos, file_stream); if (ret != 0) { - return; + ret = -BT_PACKET_SEEK_ERROR; + goto end; } retry: @@ -1265,7 +1267,8 @@ retry: if (!lttng_live_should_quit()) { fprintf(stderr, "[error] get_next_index failed\n"); } - return; + ret = -BT_PACKET_SEEK_ERROR; + goto end; } printf_verbose("Index received : packet_size : %" PRIu64 ", offset %" PRIu64 ", content_size %" PRIu64 @@ -1294,7 +1297,8 @@ retry: file_stream->parent.stream_id = stream_id; viewer_stream->ctf_stream_id = stream_id; - return; + ret = 0; + goto end; } pos->packet_size = cur_index->packet_size; @@ -1386,15 +1390,18 @@ retry: pos->offset = EOF; if (!lttng_live_should_quit()) { fprintf(stderr, "[error] get_data_packet failed\n"); + ret = -BT_PACKET_SEEK_ERROR; + } else { + ret = 0; } - return; + goto end; } viewer_stream->data_pending = 0; read_packet_header(pos, file_stream); - + ret = 0; end: - return; + bt_packet_seek_set_error(ret); } int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) @@ -1460,7 +1467,7 @@ int del_traces(gpointer key, gpointer value, gpointer user_data) lvstream->in_trace = 0; bt_list_del(&lvstream->trace_stream_node); } - if (trace->in_use) { + if (trace->in_use && trace->trace_id >= 0) { ret = bt_context_remove_trace(bt_ctx, trace->trace_id); if (ret < 0) fprintf(stderr, "[error] removing trace from context\n"); @@ -1499,6 +1506,12 @@ int add_one_trace(struct lttng_live_ctx *ctx, ret = 0; goto end; } + /* + * add_one_trace can be called recursively if during the + * bt_context_add_trace call we need to fetch new streams, so we need to + * prevent a recursive call to process our current trace. + */ + trace->in_use = 1; BT_INIT_LIST_HEAD(&mmap_list.head); @@ -1561,7 +1574,6 @@ int add_one_trace(struct lttng_live_ctx *ctx, } trace->trace_id = ret; - trace->in_use = 1; printf_verbose("Trace now in use, id = %d\n", trace->trace_id); goto end; @@ -1572,6 +1584,56 @@ end: return ret; } +/* + * Make sure all the traces we know have a metadata stream or loop on + * ask_new_streams until it is done. This must be called before we call + * add_one_trace. + * + * Return 0 when all known traces have a metadata stream, a negative value + * on error. + */ +static +int check_traces_metadata(struct lttng_live_ctx *ctx) +{ + int ret; + struct lttng_live_ctf_trace *trace; + GHashTableIter it; + gpointer key; + gpointer value; + +retry: + g_hash_table_iter_init(&it, ctx->session->ctf_traces); + while (g_hash_table_iter_next(&it, &key, &value)) { + trace = (struct lttng_live_ctf_trace *) value; + printf_verbose("Check trace %" PRIu64 " metadata\n", trace->ctf_trace_id); + while (!trace->metadata_stream) { + printf_verbose("Waiting for metadata stream\n"); + if (lttng_live_should_quit()) { + ret = 0; + goto end; + } + ret = ask_new_streams(ctx); + if (ret < 0) { + goto end; + } else if (ret == 0) { + (void) poll(NULL, 0, ACTIVE_POLL_DELAY); + } else { + /* + * If ask_new_stream got streams from a trace we did not know + * about until now, we have to reinitialize the iterator. + */ + goto retry; + } + } + } + + ret = 0; + +end: + printf_verbose("End check traces metadata\n"); + return ret; +} + static int add_traces(struct lttng_live_ctx *ctx) { @@ -1580,8 +1642,18 @@ int add_traces(struct lttng_live_ctx *ctx) GHashTableIter it; gpointer key; gpointer value; + unsigned int nr_traces; printf_verbose("Begin add traces\n"); + +retry: + nr_traces = g_hash_table_size(ctx->session->ctf_traces); + + ret = check_traces_metadata(ctx); + if (ret < 0) { + goto end; + } + g_hash_table_iter_init(&it, ctx->session->ctf_traces); while (g_hash_table_iter_next(&it, &key, &value)) { trace = (struct lttng_live_ctf_trace *) value; @@ -1589,6 +1661,14 @@ int add_traces(struct lttng_live_ctx *ctx) if (ret < 0) { goto end; } + /* + * If a new trace got added while we were adding the trace, the + * iterator is invalid and we have to restart. + */ + if (g_hash_table_size(ctx->session->ctf_traces) != nr_traces) { + printf_verbose("New trace(s) added during add_one_trace()\n"); + goto retry; + } } ret = 0;