X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=formats%2Flttng-live%2Flttng-live-comm.c;h=96817f5e170d65e02d310d0e9abc0bc224283c9f;hb=a4e7565e5f77f914eab073310895a73096276907;hp=8059b338b095dd4672021946ae497b3f8a45f7dd;hpb=4d1743735c61ade4bacc56db52c6b6f34b14abe8;p=babeltrace.git diff --git a/formats/lttng-live/lttng-live-comm.c b/formats/lttng-live/lttng-live-comm.c index 8059b338..96817f5e 100644 --- a/formats/lttng-live/lttng-live-comm.c +++ b/formats/lttng-live/lttng-live-comm.c @@ -390,7 +390,9 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream, if (!trace) { trace = g_new0(struct lttng_live_ctf_trace, 1); trace->ctf_trace_id = ctf_trace_id; - trace->streams = g_ptr_array_new(); + trace->trace_id = -1; + printf_verbose("Create trace ctf_trace_id %" PRIu64 "\n", ctf_trace_id); + BT_INIT_LIST_HEAD(&trace->stream_list); g_hash_table_insert(stream->session->ctf_traces, &trace->ctf_trace_id, trace); @@ -398,8 +400,10 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream, if (stream->metadata_flag) trace->metadata_stream = stream; + assert(!stream->in_trace); + stream->in_trace = 1; stream->ctf_trace = trace; - g_ptr_array_add(trace->streams, stream); + bt_list_add(&stream->trace_stream_node, &trace->stream_list); return ret; } @@ -506,18 +510,22 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) ret = 0; goto end; } - printf_verbose("Waiting for %" PRIu64 " streams:\n", - ctx->session->stream_count); - ctx->session->streams = g_new0(struct lttng_live_viewer_stream, - ctx->session->stream_count); + printf_verbose("Waiting for %d streams:\n", + be32toh(rp.streams_count)); for (i = 0; i < be32toh(rp.streams_count); i++) { - ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream)); + struct lttng_live_viewer_stream *lvstream; + + lvstream = g_new0(struct lttng_live_viewer_stream, 1); + ret_len = lttng_live_recv(ctx->control_sock, &stream, + sizeof(stream)); if (ret_len == 0) { fprintf(stderr, "[error] Remote side has closed connection\n"); + g_free(lvstream); goto error; } if (ret_len < 0) { perror("[error] Error receiving stream"); + g_free(lvstream); goto error; } assert(ret_len == sizeof(stream)); @@ -527,21 +535,23 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) printf_verbose(" stream %" PRIu64 " : %s/%s\n", be64toh(stream.id), stream.path_name, stream.channel_name); - ctx->session->streams[i].id = be64toh(stream.id); - ctx->session->streams[i].session = ctx->session; + lvstream->id = be64toh(stream.id); + lvstream->session = ctx->session; - ctx->session->streams[i].mmap_size = 0; - ctx->session->streams[i].ctf_stream_id = -1ULL; + lvstream->mmap_size = 0; + lvstream->ctf_stream_id = -1ULL; if (be32toh(stream.metadata_flag)) { - ctx->session->streams[i].metadata_flag = 1; + lvstream->metadata_flag = 1; } - ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i], + ret = lttng_live_ctf_trace_assign(lvstream, be64toh(stream.ctf_trace_id)); if (ret < 0) { + g_free(lvstream); goto error; } - + bt_list_add(&lvstream->session_stream_node, + &ctx->session->stream_list); } ret = 0; end: @@ -567,10 +577,11 @@ restart: id = g_array_index(ctx->session_ids, uint64_t, i); ret = lttng_live_get_new_streams(ctx, id); printf_verbose("Asking for new streams returns %d\n", ret); + if (lttng_live_should_quit()) { + ret = -1; + goto end; + } if (ret < 0) { - if (lttng_live_should_quit()) { - goto end; - } if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) { printf_verbose("Session %" PRIu64 " closed\n", id); @@ -593,7 +604,12 @@ restart: nb_streams += ret; } } - ret = nb_streams; + if (ctx->session_ids->len == 0) { + /* All sessions are closed. */ + ret = -1; + } else { + ret = nb_streams; + } end: return ret; @@ -830,6 +846,9 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx, memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + printf_verbose("get_metadata for trace_id: %d, ctf_trace_id: %" PRIu64 "\n", + metadata_stream->ctf_trace->trace_id, + metadata_stream->ctf_trace->ctf_trace_id); ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len); if (ret_len < 0) { perror("[error] Error sending get_metadata cmd and request"); @@ -948,6 +967,9 @@ int get_new_metadata(struct lttng_live_ctx *ctx, if (!len_read) { (void) poll(NULL, 0, ACTIVE_POLL_DELAY); } + if (ret < 0) { + break; /* Stop on error. */ + } } while (ret > 0 || !len_read); if (babeltrace_close_memstream(metadata_buf, &size, @@ -1083,6 +1105,11 @@ retry: viewer_stream->id = -1ULL; index->offset = EOF; ctx->session->stream_count--; + viewer_stream->in_trace = 0; + bt_list_del(&viewer_stream->trace_stream_node); + bt_list_del(&viewer_stream->session_stream_node); + *stream_id = be64toh(rp->stream_id); + g_free(viewer_stream); break; case LTTNG_VIEWER_INDEX_ERR: fprintf(stderr, "[error] get_next_index: error\n"); @@ -1197,7 +1224,8 @@ void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, ret = handle_seek_position(index, whence, viewer_stream, pos, file_stream); if (ret != 0) { - return; + ret = -BT_PACKET_SEEK_ERROR; + goto end; } retry: @@ -1239,7 +1267,8 @@ retry: if (!lttng_live_should_quit()) { fprintf(stderr, "[error] get_next_index failed\n"); } - return; + ret = -BT_PACKET_SEEK_ERROR; + goto end; } printf_verbose("Index received : packet_size : %" PRIu64 ", offset %" PRIu64 ", content_size %" PRIu64 @@ -1268,7 +1297,8 @@ retry: file_stream->parent.stream_id = stream_id; viewer_stream->ctf_stream_id = stream_id; - return; + ret = 0; + goto end; } pos->packet_size = cur_index->packet_size; @@ -1360,15 +1390,18 @@ retry: pos->offset = EOF; if (!lttng_live_should_quit()) { fprintf(stderr, "[error] get_data_packet failed\n"); + ret = -BT_PACKET_SEEK_ERROR; + } else { + ret = 0; } - return; + goto end; } viewer_stream->data_pending = 0; read_packet_header(pos, file_stream); - + ret = 0; end: - return; + bt_packet_seek_set_error(ret); } int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) @@ -1423,10 +1456,22 @@ int del_traces(gpointer key, gpointer value, gpointer user_data) struct bt_context *bt_ctx = user_data; struct lttng_live_ctf_trace *trace = value; int ret; + struct lttng_live_viewer_stream *lvstream, *tmp; - ret = bt_context_remove_trace(bt_ctx, trace->trace_id); - if (ret < 0) - fprintf(stderr, "[error] removing trace from context\n"); + /* + * We don't have ownership of the live viewer stream, just + * remove them from our list. + */ + bt_list_for_each_entry_safe(lvstream, tmp, &trace->stream_list, + trace_stream_node) { + lvstream->in_trace = 0; + bt_list_del(&lvstream->trace_stream_node); + } + if (trace->in_use && trace->trace_id >= 0) { + ret = bt_context_remove_trace(bt_ctx, trace->trace_id); + if (ret < 0) + fprintf(stderr, "[error] removing trace from context\n"); + } /* remove the key/value pair from the HT. */ return 1; @@ -1436,7 +1481,7 @@ static int add_one_trace(struct lttng_live_ctx *ctx, struct lttng_live_ctf_trace *trace) { - int i, ret; + int ret; struct bt_context *bt_ctx = ctx->bt_ctx; struct lttng_live_viewer_stream *stream; struct bt_mmap_stream *new_mmap_stream; @@ -1444,6 +1489,9 @@ int add_one_trace(struct lttng_live_ctx *ctx, struct bt_trace_descriptor *td; struct bt_trace_handle *handle; + printf_verbose("Add one trace ctf_trace_id: %" PRIu64 + " (metadata_stream: %p)\n", + trace->ctf_trace_id, trace->metadata_stream); /* * We don't know how many streams we will receive for a trace, so * once we are done receiving the traces, we add all the traces @@ -1454,15 +1502,20 @@ int add_one_trace(struct lttng_live_ctx *ctx, * If a trace is already in the context, we just skip this function. */ if (trace->in_use) { + printf_verbose("Trace already in use\n"); ret = 0; goto end; } + /* + * add_one_trace can be called recursively if during the + * bt_context_add_trace call we need to fetch new streams, so we need to + * prevent a recursive call to process our current trace. + */ + trace->in_use = 1; BT_INIT_LIST_HEAD(&mmap_list.head); - for (i = 0; i < trace->streams->len; i++) { - stream = g_ptr_array_index(trace->streams, i); - + bt_list_for_each_entry(stream, &trace->stream_list, trace_stream_node) { if (!stream->metadata_flag) { new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream)); new_mmap_stream->priv = (void *) stream; @@ -1484,6 +1537,7 @@ int add_one_trace(struct lttng_live_ctx *ctx, goto end_free; } + printf_verbose("Metadata stream found\n"); trace->metadata_fp = babeltrace_fmemopen(metadata_buf, stream->metadata_len, "rb"); if (!trace->metadata_fp) { @@ -1520,7 +1574,7 @@ int add_one_trace(struct lttng_live_ctx *ctx, } trace->trace_id = ret; - trace->in_use = 1; + printf_verbose("Trace now in use, id = %d\n", trace->trace_id); goto end; @@ -1530,6 +1584,56 @@ end: return ret; } +/* + * Make sure all the traces we know have a metadata stream or loop on + * ask_new_streams until it is done. This must be called before we call + * add_one_trace. + * + * Return 0 when all known traces have a metadata stream, a negative value + * on error. + */ +static +int check_traces_metadata(struct lttng_live_ctx *ctx) +{ + int ret; + struct lttng_live_ctf_trace *trace; + GHashTableIter it; + gpointer key; + gpointer value; + +retry: + g_hash_table_iter_init(&it, ctx->session->ctf_traces); + while (g_hash_table_iter_next(&it, &key, &value)) { + trace = (struct lttng_live_ctf_trace *) value; + printf_verbose("Check trace %" PRIu64 " metadata\n", trace->ctf_trace_id); + while (!trace->metadata_stream) { + printf_verbose("Waiting for metadata stream\n"); + if (lttng_live_should_quit()) { + ret = 0; + goto end; + } + ret = ask_new_streams(ctx); + if (ret < 0) { + goto end; + } else if (ret == 0) { + (void) poll(NULL, 0, ACTIVE_POLL_DELAY); + } else { + /* + * If ask_new_stream got streams from a trace we did not know + * about until now, we have to reinitialize the iterator. + */ + goto retry; + } + } + } + + ret = 0; + +end: + printf_verbose("End check traces metadata\n"); + return ret; +} + static int add_traces(struct lttng_live_ctx *ctx) { @@ -1538,6 +1642,17 @@ int add_traces(struct lttng_live_ctx *ctx) GHashTableIter it; gpointer key; gpointer value; + unsigned int nr_traces; + + printf_verbose("Begin add traces\n"); + +retry: + nr_traces = g_hash_table_size(ctx->session->ctf_traces); + + ret = check_traces_metadata(ctx); + if (ret < 0) { + goto end; + } g_hash_table_iter_init(&it, ctx->session->ctf_traces); while (g_hash_table_iter_next(&it, &key, &value)) { @@ -1546,11 +1661,20 @@ int add_traces(struct lttng_live_ctx *ctx) if (ret < 0) { goto end; } + /* + * If a new trace got added while we were adding the trace, the + * iterator is invalid and we have to restart. + */ + if (g_hash_table_size(ctx->session->ctf_traces) != nr_traces) { + printf_verbose("New trace(s) added during add_one_trace()\n"); + goto retry; + } } ret = 0; end: + printf_verbose("End add traces\n"); return ret; } @@ -1637,18 +1761,22 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) ret = 0; goto end; } - printf_verbose("Waiting for %" PRIu64 " streams:\n", - ctx->session->stream_count); - ctx->session->streams = g_new0(struct lttng_live_viewer_stream, - ctx->session->stream_count); + printf_verbose("Waiting for %d streams:\n", stream_count); + for (i = 0; i < stream_count; i++) { - ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream)); + struct lttng_live_viewer_stream *lvstream; + + lvstream = g_new0(struct lttng_live_viewer_stream, 1); + ret_len = lttng_live_recv(ctx->control_sock, &stream, + sizeof(stream)); if (ret_len == 0) { fprintf(stderr, "[error] Remote side has closed connection\n"); + g_free(lvstream); goto error; } if (ret_len < 0) { perror("[error] Error receiving stream"); + g_free(lvstream); goto error; } assert(ret_len == sizeof(stream)); @@ -1658,22 +1786,24 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) printf_verbose(" stream %" PRIu64 " : %s/%s\n", be64toh(stream.id), stream.path_name, stream.channel_name); - ctx->session->streams[i].id = be64toh(stream.id); - ctx->session->streams[i].session = ctx->session; + lvstream->id = be64toh(stream.id); + lvstream->session = ctx->session; - ctx->session->streams[i].mmap_size = 0; - ctx->session->streams[i].ctf_stream_id = -1ULL; + lvstream->mmap_size = 0; + lvstream->ctf_stream_id = -1ULL; if (be32toh(stream.metadata_flag)) { - ctx->session->streams[i].metadata_flag = 1; + lvstream->metadata_flag = 1; } - ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i], + ret = lttng_live_ctf_trace_assign(lvstream, be64toh(stream.ctf_trace_id)); if (ret < 0) { + g_free(lvstream); goto error; } nb_streams++; - + bt_list_add(&lvstream->session_stream_node, + &ctx->session->stream_list); } ret = nb_streams; end: @@ -1704,7 +1834,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx) fmt_write = bt_lookup_format(g_quark_from_static_string("text")); if (!fmt_write) { fprintf(stderr, "[error] ctf-text error\n"); - goto end; + goto end_free; } td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL); @@ -1756,6 +1886,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx) } ret = ask_new_streams(ctx); if (ret < 0) { + ret = 0; goto end_free; } if (!ctx->session->stream_count) { @@ -1773,10 +1904,10 @@ int lttng_live_read(struct lttng_live_ctx *ctx) if (!iter) { if (lttng_live_should_quit()) { ret = 0; - goto end; + goto end_free; } fprintf(stderr, "[error] Iterator creation error\n"); - goto end; + goto end_free; } for (;;) { if (lttng_live_should_quit()) { @@ -1809,6 +1940,8 @@ int lttng_live_read(struct lttng_live_ctx *ctx) } end_free: + g_hash_table_foreach_remove(ctx->session->ctf_traces, + del_traces, ctx->bt_ctx); bt_context_put(ctx->bt_ctx); end: if (lttng_live_should_quit()) {