Fix: set stream id in HUP case
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
index a1c95bbc31238504e82186e522aae14d0244749c..065af49fb06f5f1be0d02f8a39e9919ddafef50b 100644 (file)
@@ -390,7 +390,7 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
        if (!trace) {
                trace = g_new0(struct lttng_live_ctf_trace, 1);
                trace->ctf_trace_id = ctf_trace_id;
-               trace->streams = g_ptr_array_new();
+               BT_INIT_LIST_HEAD(&trace->stream_list);
                g_hash_table_insert(stream->session->ctf_traces,
                                &trace->ctf_trace_id,
                                trace);
@@ -398,8 +398,10 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
        if (stream->metadata_flag)
                trace->metadata_stream = stream;
 
+       assert(!stream->in_trace);
+       stream->in_trace = 1;
        stream->ctf_trace = trace;
-       g_ptr_array_add(trace->streams, stream);
+       bt_list_add(&stream->trace_stream_node, &trace->stream_list);
 
        return ret;
 }
@@ -506,18 +508,22 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                ret = 0;
                goto end;
        }
-       printf_verbose("Waiting for %" PRIu64 " streams:\n",
-               ctx->session->stream_count);
-       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
-                       ctx->session->stream_count);
+       printf_verbose("Waiting for %d streams:\n",
+               be32toh(rp.streams_count));
        for (i = 0; i < be32toh(rp.streams_count); i++) {
-               ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+               struct lttng_live_viewer_stream *lvstream;
+
+               lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+               ret_len = lttng_live_recv(ctx->control_sock, &stream,
+                               sizeof(stream));
                if (ret_len == 0) {
                        fprintf(stderr, "[error] Remote side has closed connection\n");
+                       g_free(lvstream);
                        goto error;
                }
                if (ret_len < 0) {
                        perror("[error] Error receiving stream");
+                       g_free(lvstream);
                        goto error;
                }
                assert(ret_len == sizeof(stream));
@@ -527,21 +533,23 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                printf_verbose("    stream %" PRIu64 " : %s/%s\n",
                                be64toh(stream.id), stream.path_name,
                                stream.channel_name);
-               ctx->session->streams[i].id = be64toh(stream.id);
-               ctx->session->streams[i].session = ctx->session;
+               lvstream->id = be64toh(stream.id);
+               lvstream->session = ctx->session;
 
-               ctx->session->streams[i].mmap_size = 0;
-               ctx->session->streams[i].ctf_stream_id = -1ULL;
+               lvstream->mmap_size = 0;
+               lvstream->ctf_stream_id = -1ULL;
 
                if (be32toh(stream.metadata_flag)) {
-                       ctx->session->streams[i].metadata_flag = 1;
+                       lvstream->metadata_flag = 1;
                }
-               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+               ret = lttng_live_ctf_trace_assign(lvstream,
                                be64toh(stream.ctf_trace_id));
                if (ret < 0) {
+                       g_free(lvstream);
                        goto error;
                }
-
+               bt_list_add(&lvstream->session_stream_node,
+                       &ctx->session->stream_list);
        }
        ret = 0;
 end:
@@ -567,10 +575,11 @@ restart:
                id = g_array_index(ctx->session_ids, uint64_t, i);
                ret = lttng_live_get_new_streams(ctx, id);
                printf_verbose("Asking for new streams returns %d\n", ret);
+               if (lttng_live_should_quit()) {
+                       ret = -1;
+                       goto end;
+               }
                if (ret < 0) {
-                       if (lttng_live_should_quit()) {
-                               goto end;
-                       }
                        if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
                                printf_verbose("Session %" PRIu64 " closed\n",
                                                id);
@@ -593,7 +602,12 @@ restart:
                        nb_streams += ret;
                }
        }
-       ret = nb_streams;
+       if (ctx->session_ids->len == 0) {
+               /* All sessions are closed. */
+               ret = -1;
+       } else {
+               ret = nb_streams;
+       }
 
 end:
        return ret;
@@ -607,6 +621,11 @@ int append_metadata(struct lttng_live_ctx *ctx,
        struct lttng_live_viewer_stream *metadata;
        char *metadata_buf = NULL;
 
+       if (!viewer_stream->ctf_trace->handle) {
+               printf_verbose("append_metadata: trace handle not ready yet.\n");
+               return 0;
+       }
+
        printf_verbose("get_next_index: new metadata needed\n");
        ret = get_new_metadata(ctx, viewer_stream, &metadata_buf);
        if (ret < 0) {
@@ -943,6 +962,9 @@ int get_new_metadata(struct lttng_live_ctx *ctx,
                if (!len_read) {
                        (void) poll(NULL, 0, ACTIVE_POLL_DELAY);
                }
+               if (ret < 0) {
+                       break;  /* Stop on error. */
+               }
        } while (ret > 0 || !len_read);
 
        if (babeltrace_close_memstream(metadata_buf, &size,
@@ -1078,6 +1100,11 @@ retry:
                viewer_stream->id = -1ULL;
                index->offset = EOF;
                ctx->session->stream_count--;
+               viewer_stream->in_trace = 0;
+               bt_list_del(&viewer_stream->trace_stream_node);
+               bt_list_del(&viewer_stream->session_stream_node);
+               g_free(viewer_stream);
+               *stream_id = be64toh(rp->stream_id);
                break;
        case LTTNG_VIEWER_INDEX_ERR:
                fprintf(stderr, "[error] get_next_index: error\n");
@@ -1276,14 +1303,30 @@ retry:
        }
 
        if (cur_index->content_size == 0) {
+               /* Beacon packet index */
                if (file_stream->parent.stream_class) {
                        file_stream->parent.cycles_timestamp =
                                cur_index->ts_cycles.timestamp_end;
                        file_stream->parent.real_timestamp = ctf_get_real_timestamp(
                                        &file_stream->parent,
                                        cur_index->ts_cycles.timestamp_end);
+
+                       /*
+                        * Duplicate the data from the previous index, because
+                        * the one we just received is only a beacon with no
+                        * relevant information except the timestamp_end. We
+                        * don't need to keep this timestamp_end because we already
+                        * updated the file_stream timestamps, so we only need
+                        * to keep the last real index data as prev_index. That
+                        * way, we keep the original prev timestamps and
+                        * discarded events counter. This is the same behaviour
+                        * as if we were reading a local trace, we would not
+                        * have fake indexes between real indexes.
+                        */
+                       memcpy(cur_index, prev_index, sizeof(struct packet_index));
                }
        } else {
+               /* Real packet index */
                if (file_stream->parent.stream_class) {
                        /* Convert the timestamps and append to the real_index. */
                        cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
@@ -1297,6 +1340,16 @@ retry:
                ctf_update_current_packet_index(&file_stream->parent,
                                prev_index, cur_index);
 
+               /*
+                * We need to check if we are in trace read or called
+                * from packet indexing.  In this last case, the
+                * collection is not there, so we cannot print the
+                * timestamps.
+                */
+               if ((&file_stream->parent)->stream_class->trace->parent.collection) {
+                       ctf_print_discarded_lost(stderr, &file_stream->parent);
+               }
+
                file_stream->parent.cycles_timestamp =
                                cur_index->ts_cycles.timestamp_begin;
                file_stream->parent.real_timestamp =
@@ -1392,10 +1445,22 @@ int del_traces(gpointer key, gpointer value, gpointer user_data)
        struct bt_context *bt_ctx = user_data;
        struct lttng_live_ctf_trace *trace = value;
        int ret;
+       struct lttng_live_viewer_stream *lvstream, *tmp;
 
-       ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
-       if (ret < 0)
-               fprintf(stderr, "[error] removing trace from context\n");
+       /*
+        * We don't have ownership of the live viewer stream, just
+        * remove them from our list.
+        */
+       bt_list_for_each_entry_safe(lvstream, tmp, &trace->stream_list,
+                       trace_stream_node) {
+               lvstream->in_trace = 0;
+               bt_list_del(&lvstream->trace_stream_node);
+       }
+       if (trace->in_use) {
+               ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
+               if (ret < 0)
+                       fprintf(stderr, "[error] removing trace from context\n");
+       }
 
        /* remove the key/value pair from the HT. */
        return 1;
@@ -1405,7 +1470,7 @@ static
 int add_one_trace(struct lttng_live_ctx *ctx,
                struct lttng_live_ctf_trace *trace)
 {
-       int i, ret;
+       int ret;
        struct bt_context *bt_ctx = ctx->bt_ctx;
        struct lttng_live_viewer_stream *stream;
        struct bt_mmap_stream *new_mmap_stream;
@@ -1429,9 +1494,7 @@ int add_one_trace(struct lttng_live_ctx *ctx,
 
        BT_INIT_LIST_HEAD(&mmap_list.head);
 
-       for (i = 0; i < trace->streams->len; i++) {
-               stream = g_ptr_array_index(trace->streams, i);
-
+       bt_list_for_each_entry(stream, &trace->stream_list, trace_stream_node) {
                if (!stream->metadata_flag) {
                        new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
                        new_mmap_stream->priv = (void *) stream;
@@ -1606,18 +1669,22 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                ret = 0;
                goto end;
        }
-       printf_verbose("Waiting for %" PRIu64 " streams:\n",
-               ctx->session->stream_count);
-       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
-                       ctx->session->stream_count);
+       printf_verbose("Waiting for %d streams:\n", stream_count);
+
        for (i = 0; i < stream_count; i++) {
-               ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+               struct lttng_live_viewer_stream *lvstream;
+
+               lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+               ret_len = lttng_live_recv(ctx->control_sock, &stream,
+                               sizeof(stream));
                if (ret_len == 0) {
                        fprintf(stderr, "[error] Remote side has closed connection\n");
+                       g_free(lvstream);
                        goto error;
                }
                if (ret_len < 0) {
                        perror("[error] Error receiving stream");
+                       g_free(lvstream);
                        goto error;
                }
                assert(ret_len == sizeof(stream));
@@ -1627,22 +1694,24 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                printf_verbose("    stream %" PRIu64 " : %s/%s\n",
                                be64toh(stream.id), stream.path_name,
                                stream.channel_name);
-               ctx->session->streams[i].id = be64toh(stream.id);
-               ctx->session->streams[i].session = ctx->session;
+               lvstream->id = be64toh(stream.id);
+               lvstream->session = ctx->session;
 
-               ctx->session->streams[i].mmap_size = 0;
-               ctx->session->streams[i].ctf_stream_id = -1ULL;
+               lvstream->mmap_size = 0;
+               lvstream->ctf_stream_id = -1ULL;
 
                if (be32toh(stream.metadata_flag)) {
-                       ctx->session->streams[i].metadata_flag = 1;
+                       lvstream->metadata_flag = 1;
                }
-               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+               ret = lttng_live_ctf_trace_assign(lvstream,
                                be64toh(stream.ctf_trace_id));
                if (ret < 0) {
+                       g_free(lvstream);
                        goto error;
                }
                nb_streams++;
-
+               bt_list_add(&lvstream->session_stream_node,
+                       &ctx->session->stream_list);
        }
        ret = nb_streams;
 end:
@@ -1673,7 +1742,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
        fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
        if (!fmt_write) {
                fprintf(stderr, "[error] ctf-text error\n");
-               goto end;
+               goto end_free;
        }
 
        td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
@@ -1725,6 +1794,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
                        }
                        ret = ask_new_streams(ctx);
                        if (ret < 0) {
+                               ret = 0;
                                goto end_free;
                        }
                        if (!ctx->session->stream_count) {
@@ -1742,10 +1812,10 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
                if (!iter) {
                        if (lttng_live_should_quit()) {
                                ret = 0;
-                               goto end;
+                               goto end_free;
                        }
                        fprintf(stderr, "[error] Iterator creation error\n");
-                       goto end;
+                       goto end_free;
                }
                for (;;) {
                        if (lttng_live_should_quit()) {
@@ -1778,6 +1848,8 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
        }
 
 end_free:
+       g_hash_table_foreach_remove(ctx->session->ctf_traces,
+                       del_traces, ctx->bt_ctx);
        bt_context_put(ctx->bt_ctx);
 end:
        if (lttng_live_should_quit()) {
This page took 0.027302 seconds and 4 git commands to generate.