Fix: leak of streams
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 25 Aug 2015 12:24:13 +0000 (08:24 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 31 Jan 2018 19:46:03 +0000 (14:46 -0500)
Streams can be added when we attach to a session and if new streams are
added while the session is running. We were overriding the session
streams array in get_new_stream, so at teardown we could leak the
streams that were added before that call. We now keep a list of all the
streams in the session and add each stream in that list in both code
paths.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
formats/lttng-live/lttng-live-comm.c
formats/lttng-live/lttng-live-plugin.c
formats/lttng-live/lttng-live.h

index bcbb06aa2e0344c4baa5504a7a66964705951913..ec0a0123ecda35c7f3a19fe4c0f89e73285b048c 100644 (file)
@@ -506,18 +506,22 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                ret = 0;
                goto end;
        }
                ret = 0;
                goto end;
        }
-       printf_verbose("Waiting for %" PRIu64 " streams:\n",
-               ctx->session->stream_count);
-       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
-                       ctx->session->stream_count);
+       printf_verbose("Waiting for %d streams:\n",
+               be32toh(rp.streams_count));
        for (i = 0; i < be32toh(rp.streams_count); i++) {
        for (i = 0; i < be32toh(rp.streams_count); i++) {
-               ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+               struct lttng_live_viewer_stream *lvstream;
+
+               lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+               ret_len = lttng_live_recv(ctx->control_sock, &stream,
+                               sizeof(stream));
                if (ret_len == 0) {
                        fprintf(stderr, "[error] Remote side has closed connection\n");
                if (ret_len == 0) {
                        fprintf(stderr, "[error] Remote side has closed connection\n");
+                       g_free(lvstream);
                        goto error;
                }
                if (ret_len < 0) {
                        perror("[error] Error receiving stream");
                        goto error;
                }
                if (ret_len < 0) {
                        perror("[error] Error receiving stream");
+                       g_free(lvstream);
                        goto error;
                }
                assert(ret_len == sizeof(stream));
                        goto error;
                }
                assert(ret_len == sizeof(stream));
@@ -527,21 +531,23 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                printf_verbose("    stream %" PRIu64 " : %s/%s\n",
                                be64toh(stream.id), stream.path_name,
                                stream.channel_name);
                printf_verbose("    stream %" PRIu64 " : %s/%s\n",
                                be64toh(stream.id), stream.path_name,
                                stream.channel_name);
-               ctx->session->streams[i].id = be64toh(stream.id);
-               ctx->session->streams[i].session = ctx->session;
+               lvstream->id = be64toh(stream.id);
+               lvstream->session = ctx->session;
 
 
-               ctx->session->streams[i].mmap_size = 0;
-               ctx->session->streams[i].ctf_stream_id = -1ULL;
+               lvstream->mmap_size = 0;
+               lvstream->ctf_stream_id = -1ULL;
 
                if (be32toh(stream.metadata_flag)) {
 
                if (be32toh(stream.metadata_flag)) {
-                       ctx->session->streams[i].metadata_flag = 1;
+                       lvstream->metadata_flag = 1;
                }
                }
-               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+               ret = lttng_live_ctf_trace_assign(lvstream,
                                be64toh(stream.ctf_trace_id));
                if (ret < 0) {
                                be64toh(stream.ctf_trace_id));
                if (ret < 0) {
+                       g_free(lvstream);
                        goto error;
                }
                        goto error;
                }
-
+               bt_list_add(&lvstream->stream_node,
+                       &ctx->session->stream_list);
        }
        ret = 0;
 end:
        }
        ret = 0;
 end:
@@ -1089,6 +1095,7 @@ retry:
                goto retry;
        case LTTNG_VIEWER_INDEX_HUP:
                printf_verbose("get_next_index: stream hung up\n");
                goto retry;
        case LTTNG_VIEWER_INDEX_HUP:
                printf_verbose("get_next_index: stream hung up\n");
+               /* TODO: remove stream from session list and trace ptr array */
                viewer_stream->id = -1ULL;
                index->offset = EOF;
                ctx->session->stream_count--;
                viewer_stream->id = -1ULL;
                index->offset = EOF;
                ctx->session->stream_count--;
@@ -1646,18 +1653,22 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                ret = 0;
                goto end;
        }
                ret = 0;
                goto end;
        }
-       printf_verbose("Waiting for %" PRIu64 " streams:\n",
-               ctx->session->stream_count);
-       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
-                       ctx->session->stream_count);
+       printf_verbose("Waiting for %d streams:\n", stream_count);
+
        for (i = 0; i < stream_count; i++) {
        for (i = 0; i < stream_count; i++) {
-               ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+               struct lttng_live_viewer_stream *lvstream;
+
+               lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+               ret_len = lttng_live_recv(ctx->control_sock, &stream,
+                               sizeof(stream));
                if (ret_len == 0) {
                        fprintf(stderr, "[error] Remote side has closed connection\n");
                if (ret_len == 0) {
                        fprintf(stderr, "[error] Remote side has closed connection\n");
+                       g_free(lvstream);
                        goto error;
                }
                if (ret_len < 0) {
                        perror("[error] Error receiving stream");
                        goto error;
                }
                if (ret_len < 0) {
                        perror("[error] Error receiving stream");
+                       g_free(lvstream);
                        goto error;
                }
                assert(ret_len == sizeof(stream));
                        goto error;
                }
                assert(ret_len == sizeof(stream));
@@ -1667,22 +1678,24 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                printf_verbose("    stream %" PRIu64 " : %s/%s\n",
                                be64toh(stream.id), stream.path_name,
                                stream.channel_name);
                printf_verbose("    stream %" PRIu64 " : %s/%s\n",
                                be64toh(stream.id), stream.path_name,
                                stream.channel_name);
-               ctx->session->streams[i].id = be64toh(stream.id);
-               ctx->session->streams[i].session = ctx->session;
+               lvstream->id = be64toh(stream.id);
+               lvstream->session = ctx->session;
 
 
-               ctx->session->streams[i].mmap_size = 0;
-               ctx->session->streams[i].ctf_stream_id = -1ULL;
+               lvstream->mmap_size = 0;
+               lvstream->ctf_stream_id = -1ULL;
 
                if (be32toh(stream.metadata_flag)) {
 
                if (be32toh(stream.metadata_flag)) {
-                       ctx->session->streams[i].metadata_flag = 1;
+                       lvstream->metadata_flag = 1;
                }
                }
-               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+               ret = lttng_live_ctf_trace_assign(lvstream,
                                be64toh(stream.ctf_trace_id));
                if (ret < 0) {
                                be64toh(stream.ctf_trace_id));
                if (ret < 0) {
+                       g_free(lvstream);
                        goto error;
                }
                nb_streams++;
                        goto error;
                }
                nb_streams++;
-
+               bt_list_add(&lvstream->stream_node,
+                       &ctx->session->stream_list);
        }
        ret = nb_streams;
 end:
        }
        ret = nb_streams;
 end:
index a1fbb9e826efc903009e777a70dc0fd7522004c4..a10a1b21235acad04b7ad85c1de16081ec77c621 100644 (file)
@@ -215,6 +215,17 @@ gboolean g_uint64p_equal(gconstpointer a, gconstpointer b)
        return TRUE;
 }
 
        return TRUE;
 }
 
+static void free_session_streams(struct lttng_live_session *lsession)
+{
+       struct lttng_live_viewer_stream *lvstream, *tmp;
+
+       bt_list_for_each_entry_safe(lvstream, tmp, &lsession->stream_list,
+                       stream_node) {
+               bt_list_del(&lvstream->stream_node);
+               g_free(lvstream);
+       }
+}
+
 static int lttng_live_open_trace_read(const char *path)
 {
        int ret = 0;
 static int lttng_live_open_trace_read(const char *path)
 {
        int ret = 0;
@@ -223,6 +234,8 @@ static int lttng_live_open_trace_read(const char *path)
        ctx = g_new0(struct lttng_live_ctx, 1);
        ctx->session = g_new0(struct lttng_live_session, 1);
 
        ctx = g_new0(struct lttng_live_ctx, 1);
        ctx->session = g_new0(struct lttng_live_session, 1);
 
+       BT_INIT_LIST_HEAD(&ctx->session->stream_list);
+
        /* We need a pointer to the context from the packet_seek function. */
        ctx->session->ctx = ctx;
 
        /* We need a pointer to the context from the packet_seek function. */
        ctx->session->ctx = ctx;
 
@@ -263,8 +276,8 @@ static int lttng_live_open_trace_read(const char *path)
 
 end_free:
        g_hash_table_destroy(ctx->session->ctf_traces);
 
 end_free:
        g_hash_table_destroy(ctx->session->ctf_traces);
+       free_session_streams(ctx->session);
        g_free(ctx->session);
        g_free(ctx->session);
-       g_free(ctx->session->streams);
        g_free(ctx);
 
        if (lttng_live_should_quit()) {
        g_free(ctx);
 
        if (lttng_live_should_quit()) {
index 276051dafeb82adf3cfd2bc3559e603964681a59..ef799fd7f5d402fc420cde3bd76e6fea765b1739 100644 (file)
@@ -26,6 +26,7 @@
 
 #include <stdint.h>
 #include <sys/param.h>
 
 #include <stdint.h>
 #include <sys/param.h>
+#include <babeltrace/list.h>
 #include "lttng-viewer-abi.h"
 
 #define LTTNG_DEFAULT_NETWORK_VIEWER_PORT      5344
 #include "lttng-viewer-abi.h"
 
 #define LTTNG_DEFAULT_NETWORK_VIEWER_PORT      5344
@@ -65,6 +66,7 @@ struct lttng_live_viewer_stream {
        struct lttng_live_session *session;
        struct lttng_live_ctf_trace *ctf_trace;
        struct lttng_viewer_index current_index;
        struct lttng_live_session *session;
        struct lttng_live_ctf_trace *ctf_trace;
        struct lttng_viewer_index current_index;
+       struct bt_list_head stream_node;
        char path[PATH_MAX];
 };
 
        char path[PATH_MAX];
 };
 
@@ -72,7 +74,7 @@ struct lttng_live_session {
        uint64_t live_timer_interval;
        uint64_t stream_count;
        struct lttng_live_ctx *ctx;
        uint64_t live_timer_interval;
        uint64_t stream_count;
        struct lttng_live_ctx *ctx;
-       struct lttng_live_viewer_stream *streams;
+       struct bt_list_head stream_list;
        GHashTable *ctf_traces;
 };
 
        GHashTable *ctf_traces;
 };
 
This page took 0.040174 seconds and 4 git commands to generate.