Fix: leak of streams
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
index 2b793e5f357ba30079b884a01d9df4a5a08295a7..ec0a0123ecda35c7f3a19fe4c0f89e73285b048c 100644 (file)
@@ -506,18 +506,22 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                ret = 0;
                goto end;
        }
-       printf_verbose("Waiting for %" PRIu64 " streams:\n",
-               ctx->session->stream_count);
-       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
-                       ctx->session->stream_count);
+       printf_verbose("Waiting for %d streams:\n",
+               be32toh(rp.streams_count));
        for (i = 0; i < be32toh(rp.streams_count); i++) {
-               ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+               struct lttng_live_viewer_stream *lvstream;
+
+               lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+               ret_len = lttng_live_recv(ctx->control_sock, &stream,
+                               sizeof(stream));
                if (ret_len == 0) {
                        fprintf(stderr, "[error] Remote side has closed connection\n");
+                       g_free(lvstream);
                        goto error;
                }
                if (ret_len < 0) {
                        perror("[error] Error receiving stream");
+                       g_free(lvstream);
                        goto error;
                }
                assert(ret_len == sizeof(stream));
@@ -527,21 +531,23 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                printf_verbose("    stream %" PRIu64 " : %s/%s\n",
                                be64toh(stream.id), stream.path_name,
                                stream.channel_name);
-               ctx->session->streams[i].id = be64toh(stream.id);
-               ctx->session->streams[i].session = ctx->session;
+               lvstream->id = be64toh(stream.id);
+               lvstream->session = ctx->session;
 
-               ctx->session->streams[i].mmap_size = 0;
-               ctx->session->streams[i].ctf_stream_id = -1ULL;
+               lvstream->mmap_size = 0;
+               lvstream->ctf_stream_id = -1ULL;
 
                if (be32toh(stream.metadata_flag)) {
-                       ctx->session->streams[i].metadata_flag = 1;
+                       lvstream->metadata_flag = 1;
                }
-               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+               ret = lttng_live_ctf_trace_assign(lvstream,
                                be64toh(stream.ctf_trace_id));
                if (ret < 0) {
+                       g_free(lvstream);
                        goto error;
                }
-
+               bt_list_add(&lvstream->stream_node,
+                       &ctx->session->stream_list);
        }
        ret = 0;
 end:
@@ -567,10 +573,11 @@ restart:
                id = g_array_index(ctx->session_ids, uint64_t, i);
                ret = lttng_live_get_new_streams(ctx, id);
                printf_verbose("Asking for new streams returns %d\n", ret);
+               if (lttng_live_should_quit()) {
+                       ret = -1;
+                       goto end;
+               }
                if (ret < 0) {
-                       if (lttng_live_should_quit()) {
-                               goto end;
-                       }
                        if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
                                printf_verbose("Session %" PRIu64 " closed\n",
                                                id);
@@ -593,7 +600,12 @@ restart:
                        nb_streams += ret;
                }
        }
-       ret = nb_streams;
+       if (ctx->session_ids->len == 0) {
+               /* All sessions are closed. */
+               ret = -1;
+       } else {
+               ret = nb_streams;
+       }
 
 end:
        return ret;
@@ -607,6 +619,11 @@ int append_metadata(struct lttng_live_ctx *ctx,
        struct lttng_live_viewer_stream *metadata;
        char *metadata_buf = NULL;
 
+       if (!viewer_stream->ctf_trace->handle) {
+               printf_verbose("append_metadata: trace handle not ready yet.\n");
+               return 0;
+       }
+
        printf_verbose("get_next_index: new metadata needed\n");
        ret = get_new_metadata(ctx, viewer_stream, &metadata_buf);
        if (ret < 0) {
@@ -943,6 +960,9 @@ int get_new_metadata(struct lttng_live_ctx *ctx,
                if (!len_read) {
                        (void) poll(NULL, 0, ACTIVE_POLL_DELAY);
                }
+               if (ret < 0) {
+                       break;  /* Stop on error. */
+               }
        } while (ret > 0 || !len_read);
 
        if (babeltrace_close_memstream(metadata_buf, &size,
@@ -1075,6 +1095,7 @@ retry:
                goto retry;
        case LTTNG_VIEWER_INDEX_HUP:
                printf_verbose("get_next_index: stream hung up\n");
+               /* TODO: remove stream from session list and trace ptr array */
                viewer_stream->id = -1ULL;
                index->offset = EOF;
                ctx->session->stream_count--;
@@ -1632,18 +1653,22 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                ret = 0;
                goto end;
        }
-       printf_verbose("Waiting for %" PRIu64 " streams:\n",
-               ctx->session->stream_count);
-       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
-                       ctx->session->stream_count);
+       printf_verbose("Waiting for %d streams:\n", stream_count);
+
        for (i = 0; i < stream_count; i++) {
-               ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+               struct lttng_live_viewer_stream *lvstream;
+
+               lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+               ret_len = lttng_live_recv(ctx->control_sock, &stream,
+                               sizeof(stream));
                if (ret_len == 0) {
                        fprintf(stderr, "[error] Remote side has closed connection\n");
+                       g_free(lvstream);
                        goto error;
                }
                if (ret_len < 0) {
                        perror("[error] Error receiving stream");
+                       g_free(lvstream);
                        goto error;
                }
                assert(ret_len == sizeof(stream));
@@ -1653,22 +1678,24 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                printf_verbose("    stream %" PRIu64 " : %s/%s\n",
                                be64toh(stream.id), stream.path_name,
                                stream.channel_name);
-               ctx->session->streams[i].id = be64toh(stream.id);
-               ctx->session->streams[i].session = ctx->session;
+               lvstream->id = be64toh(stream.id);
+               lvstream->session = ctx->session;
 
-               ctx->session->streams[i].mmap_size = 0;
-               ctx->session->streams[i].ctf_stream_id = -1ULL;
+               lvstream->mmap_size = 0;
+               lvstream->ctf_stream_id = -1ULL;
 
                if (be32toh(stream.metadata_flag)) {
-                       ctx->session->streams[i].metadata_flag = 1;
+                       lvstream->metadata_flag = 1;
                }
-               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+               ret = lttng_live_ctf_trace_assign(lvstream,
                                be64toh(stream.ctf_trace_id));
                if (ret < 0) {
+                       g_free(lvstream);
                        goto error;
                }
                nb_streams++;
-
+               bt_list_add(&lvstream->stream_node,
+                       &ctx->session->stream_list);
        }
        ret = nb_streams;
 end:
@@ -1751,6 +1778,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
                        }
                        ret = ask_new_streams(ctx);
                        if (ret < 0) {
+                               ret = 0;
                                goto end_free;
                        }
                        if (!ctx->session->stream_count) {
This page took 0.025772 seconds and 4 git commands to generate.