Streams can be added when we attach to a session and if new streams are
added while the session is running. We were overriding the session
streams array in get_new_stream, so at teardown we could leak the
streams that were added before that call. We now keep a list of all the
streams in the session and add each stream in that list in both code
paths.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n",
+ be32toh(rp.streams_count));
for (i = 0; i < be32toh(rp.streams_count); i++) {
for (i = 0; i < be32toh(rp.streams_count); i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
goto error;
}
assert(ret_len == sizeof(stream));
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ bt_list_add(&lvstream->stream_node,
+ &ctx->session->stream_list);
goto retry;
case LTTNG_VIEWER_INDEX_HUP:
printf_verbose("get_next_index: stream hung up\n");
goto retry;
case LTTNG_VIEWER_INDEX_HUP:
printf_verbose("get_next_index: stream hung up\n");
+ /* TODO: remove stream from session list and trace ptr array */
viewer_stream->id = -1ULL;
index->offset = EOF;
ctx->session->stream_count--;
viewer_stream->id = -1ULL;
index->offset = EOF;
ctx->session->stream_count--;
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n", stream_count);
+
for (i = 0; i < stream_count; i++) {
for (i = 0; i < stream_count; i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
goto error;
}
assert(ret_len == sizeof(stream));
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
be64toh(stream.ctf_trace_id));
if (ret < 0) {
goto error;
}
nb_streams++;
goto error;
}
nb_streams++;
+ bt_list_add(&lvstream->stream_node,
+ &ctx->session->stream_list);
+static void free_session_streams(struct lttng_live_session *lsession)
+{
+ struct lttng_live_viewer_stream *lvstream, *tmp;
+
+ bt_list_for_each_entry_safe(lvstream, tmp, &lsession->stream_list,
+ stream_node) {
+ bt_list_del(&lvstream->stream_node);
+ g_free(lvstream);
+ }
+}
+
static int lttng_live_open_trace_read(const char *path)
{
int ret = 0;
static int lttng_live_open_trace_read(const char *path)
{
int ret = 0;
ctx = g_new0(struct lttng_live_ctx, 1);
ctx->session = g_new0(struct lttng_live_session, 1);
ctx = g_new0(struct lttng_live_ctx, 1);
ctx->session = g_new0(struct lttng_live_session, 1);
+ BT_INIT_LIST_HEAD(&ctx->session->stream_list);
+
/* We need a pointer to the context from the packet_seek function. */
ctx->session->ctx = ctx;
/* We need a pointer to the context from the packet_seek function. */
ctx->session->ctx = ctx;
end_free:
g_hash_table_destroy(ctx->session->ctf_traces);
end_free:
g_hash_table_destroy(ctx->session->ctf_traces);
+ free_session_streams(ctx->session);
- g_free(ctx->session->streams);
g_free(ctx);
if (lttng_live_should_quit()) {
g_free(ctx);
if (lttng_live_should_quit()) {
#include <stdint.h>
#include <sys/param.h>
#include <stdint.h>
#include <sys/param.h>
+#include <babeltrace/list.h>
#include "lttng-viewer-abi.h"
#define LTTNG_DEFAULT_NETWORK_VIEWER_PORT 5344
#include "lttng-viewer-abi.h"
#define LTTNG_DEFAULT_NETWORK_VIEWER_PORT 5344
struct lttng_live_session *session;
struct lttng_live_ctf_trace *ctf_trace;
struct lttng_viewer_index current_index;
struct lttng_live_session *session;
struct lttng_live_ctf_trace *ctf_trace;
struct lttng_viewer_index current_index;
+ struct bt_list_head stream_node;
uint64_t live_timer_interval;
uint64_t stream_count;
struct lttng_live_ctx *ctx;
uint64_t live_timer_interval;
uint64_t stream_count;
struct lttng_live_ctx *ctx;
- struct lttng_live_viewer_stream *streams;
+ struct bt_list_head stream_list;
GHashTable *ctf_traces;
};
GHashTable *ctf_traces;
};