X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=formats%2Flttng-live%2Flttng-live-functions.c;h=60f5fb51f346b6b8a64184a1e01a1ae2ca03ade1;hb=b5a1fa45f7cfaccf73b21712047775ab8f957bb0;hp=aee78cbf66012d204bd46cd1a34d11a545082311;hpb=3af4fc482abd4a97ef3334536b8f8a43d46632fd;p=babeltrace.git diff --git a/formats/lttng-live/lttng-live-functions.c b/formats/lttng-live/lttng-live-functions.c index aee78cbf..60f5fb51 100644 --- a/formats/lttng-live/lttng-live-functions.c +++ b/formats/lttng-live/lttng-live-functions.c @@ -50,7 +50,7 @@ #include #include "lttng-live-functions.h" -#include "lttng-viewer.h" +#include "lttng-viewer-abi.h" /* * Memory allocation zeroed @@ -62,14 +62,18 @@ ((type) (a) > (type) (b) ? (type) (a) : (type) (b)) #endif -int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname, - int port) +static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, + size_t index, int whence); +static void add_traces(gpointer key, gpointer value, gpointer user_data); +static int del_traces(gpointer key, gpointer value, gpointer user_data); + +int lttng_live_connect_viewer(struct lttng_live_ctx *ctx) { struct hostent *host; struct sockaddr_in server_addr; int ret; - host = gethostbyname(hostname); + host = gethostbyname(ctx->relay_hostname); if (!host) { ret = -1; goto end; @@ -82,7 +86,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname, } server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(port); + server_addr.sin_port = htons(ctx->port); server_addr.sin_addr = *((struct in_addr *) host->h_addr); bzero(&(server_addr.sin_zero), 8); @@ -156,14 +160,82 @@ error: return ret; } +static +void free_session_list(GPtrArray *session_list) +{ + int i; + struct lttng_live_relay_session *relay_session; + + for (i = 0; i < session_list->len; i++) { + relay_session = g_ptr_array_index(session_list, i); + free(relay_session->name); + free(relay_session->hostname); + } + g_ptr_array_free(session_list, TRUE); +} + +static +void print_session_list(GPtrArray *session_list, const char *path) +{ + int i; + struct lttng_live_relay_session *relay_session; + + for (i = 0; i < session_list->len; i++) { + relay_session = g_ptr_array_index(session_list, i); + fprintf(stdout, "%s/host/%s/%s (timer = %u, " + "%u stream(s), %u client(s) connected)\n", + path, relay_session->hostname, + relay_session->name, relay_session->timer, + relay_session->streams, relay_session->clients); + } +} + +static +void update_session_list(GPtrArray *session_list, char *hostname, + char *session_name, uint32_t streams, uint32_t clients, + uint32_t timer) +{ + int i, found = 0; + struct lttng_live_relay_session *relay_session; + + for (i = 0; i < session_list->len; i++) { + relay_session = g_ptr_array_index(session_list, i); + if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) && + strncmp(relay_session->name, + session_name, NAME_MAX) == 0) { + relay_session->streams += streams; + if (relay_session->clients < clients) + relay_session->clients = clients; + found = 1; + break; + } + } + if (found) + return; + + relay_session = g_new0(struct lttng_live_relay_session, 1); + relay_session->hostname = strndup(hostname, NAME_MAX); + relay_session->name = strndup(session_name, NAME_MAX); + relay_session->clients = clients; + relay_session->streams = streams; + relay_session->timer = timer; + g_ptr_array_add(session_list, relay_session); +} + int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) { struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; struct lttng_viewer_session lsession; - int i, ret; + int i, ret, sessions_count, print_list = 0; ssize_t ret_len; - int sessions_count; + uint64_t session_id; + GPtrArray *session_list = NULL; + + if (strlen(ctx->session_name) == 0) { + print_list = 1; + session_list = g_ptr_array_new(); + } cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = 0; @@ -190,8 +262,6 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) assert(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); - fprintf(stdout, "%u active session(s)%c\n", sessions_count, - sessions_count > 0 ? ':' : ' '); for (i = 0; i < sessions_count; i++) { do { ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0); @@ -204,14 +274,30 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) assert(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; + session_id = be64toh(lsession.id); + + if (print_list) { + update_session_list(session_list, + lsession.hostname, + lsession.session_name, + be32toh(lsession.streams), + be32toh(lsession.clients), + be32toh(lsession.live_timer)); + } else { + if ((strncmp(lsession.session_name, ctx->session_name, + NAME_MAX) == 0) && (strncmp(lsession.hostname, + ctx->traced_hostname, NAME_MAX) == 0)) { + printf_verbose("Reading from session %" PRIu64 "\n", + session_id); + g_array_append_val(ctx->session_ids, + session_id); + } + } + } - fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, " - "%u stream(s), %u client(s) connected)\n", - path, be64toh(lsession.id), - lsession.session_name, lsession.hostname, - be32toh(lsession.live_timer), - be32toh(lsession.streams), - be32toh(lsession.clients)); + if (print_list) { + print_session_list(session_list, path); + free_session_list(session_list); } ret = 0; @@ -323,7 +409,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) goto end; } - ctx->session->stream_count = be32toh(rp.streams_count); + ctx->session->stream_count += be32toh(rp.streams_count); /* * When the session is created but not started, we do an active wait * until it starts. It allows the viewer to start processing the trace @@ -404,6 +490,44 @@ error: return ret; } +static +int ask_new_streams(struct lttng_live_ctx *ctx) +{ + int i, ret = 0; + uint64_t id; + +restart: + for (i = 0; i < ctx->session_ids->len; i++) { + id = g_array_index(ctx->session_ids, uint64_t, i); + ret = lttng_live_get_new_streams(ctx, id); + printf_verbose("Asking for new streams returns %d\n", + ret); + if (ret < 0) { + if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) { + printf_verbose("Session %" PRIu64 " closed\n", + id); + /* + * The streams have already been closed during + * the reading, so we only need to get rid of + * the trace in our internal table of sessions. + */ + g_array_remove_index(ctx->session_ids, i); + /* + * We can't continue iterating on the g_array + * after a remove, we have to start again. + */ + goto restart; + } else { + ret = -1; + goto end; + } + } + } + +end: + return ret; +} + static int get_data_packet(struct lttng_live_ctx *ctx, struct ctf_stream_pos *pos, @@ -454,7 +578,13 @@ int get_data_packet(struct lttng_live_ctx *ctx, ret = ret_len; goto error; } - assert(ret_len == sizeof(rp)); + if (ret_len != sizeof(rp)) { + fprintf(stderr, "[error] get_data_packet: expected %" PRId64 + ", received %" PRId64 "\n", ret_len, + sizeof(rp)); + ret = -1; + goto error; + } rp.flags = be32toh(rp.flags); @@ -474,6 +604,13 @@ int get_data_packet(struct lttng_live_ctx *ctx, ret = 0; goto end; } + if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { + ret = ask_new_streams(ctx); + if (ret < 0) + goto error; + g_hash_table_foreach(ctx->session->ctf_traces, add_traces, + ctx->bt_ctx); + } fprintf(stderr, "[error] get_data_packet: error\n"); ret = -1; goto end; @@ -716,7 +853,7 @@ retry: case LTTNG_VIEWER_INDEX_INACTIVE: printf_verbose("get_next_index: inactive\n"); memset(index, 0, sizeof(struct packet_index)); - index->timestamp_end = be64toh(rp.timestamp_end); + index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); break; case LTTNG_VIEWER_INDEX_OK: printf_verbose("get_next_index: Ok, need metadata update : %u\n", @@ -724,8 +861,8 @@ retry: index->offset = be64toh(rp.offset); index->packet_size = be64toh(rp.packet_size); index->content_size = be64toh(rp.content_size); - index->timestamp_begin = be64toh(rp.timestamp_begin); - index->timestamp_end = be64toh(rp.timestamp_end); + index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin); + index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); index->events_discarded = be64toh(rp.events_discarded); if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { @@ -736,6 +873,13 @@ retry: goto error; } } + if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { + ret = ask_new_streams(ctx); + if (ret < 0) + goto error; + g_hash_table_foreach(ctx->session->ctf_traces, add_traces, + ctx->bt_ctx); + } break; case LTTNG_VIEWER_INDEX_RETRY: printf_verbose("get_next_index: retry\n"); @@ -746,6 +890,7 @@ retry: viewer_stream->id = -1ULL; viewer_stream->fd = -1; index->offset = EOF; + ctx->session->stream_count--; break; case LTTNG_VIEWER_INDEX_ERR: fprintf(stderr, "[error] get_next_index: error\n"); @@ -763,12 +908,13 @@ error: return ret; } +static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence) { struct ctf_stream_pos *pos; struct ctf_file_stream *file_stream; - struct packet_index packet_index; + struct packet_index *prev_index = NULL, *cur_index; struct lttng_live_viewer_stream *viewer_stream; struct lttng_live_session *session; int ret; @@ -779,31 +925,72 @@ retry: viewer_stream = (struct lttng_live_viewer_stream *) pos->priv; session = viewer_stream->session; + switch (pos->packet_index->len) { + case 0: + g_array_set_size(pos->packet_index, 1); + cur_index = &g_array_index(pos->packet_index, + struct packet_index, 0); + break; + case 1: + g_array_set_size(pos->packet_index, 2); + prev_index = &g_array_index(pos->packet_index, + struct packet_index, 0); + cur_index = &g_array_index(pos->packet_index, + struct packet_index, 1); + break; + case 2: + g_array_index(pos->packet_index, + struct packet_index, 0) = + g_array_index(pos->packet_index, + struct packet_index, 1); + prev_index = &g_array_index(pos->packet_index, + struct packet_index, 0); + cur_index = &g_array_index(pos->packet_index, + struct packet_index, 1); + break; + default: + abort(); + break; + } printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id); - ret = get_next_index(session->ctx, viewer_stream, &packet_index); + ret = get_next_index(session->ctx, viewer_stream, cur_index); if (ret < 0) { pos->offset = EOF; fprintf(stderr, "[error] get_next_index failed\n"); return; } - pos->packet_size = packet_index.packet_size; - pos->content_size = packet_index.content_size; + pos->packet_size = cur_index->packet_size; + pos->content_size = cur_index->content_size; pos->mmap_base_offset = 0; - if (packet_index.offset == EOF) { + if (cur_index->offset == EOF) { pos->offset = EOF; } else { pos->offset = 0; } - if (packet_index.content_size == 0) { - file_stream->parent.cycles_timestamp = packet_index.timestamp_end; + if (cur_index->content_size == 0) { + file_stream->parent.cycles_timestamp = + cur_index->ts_cycles.timestamp_end; file_stream->parent.real_timestamp = ctf_get_real_timestamp( - &file_stream->parent, packet_index.timestamp_end); + &file_stream->parent, + cur_index->ts_cycles.timestamp_end); } else { - file_stream->parent.cycles_timestamp = packet_index.timestamp_begin; - file_stream->parent.real_timestamp = ctf_get_real_timestamp( - &file_stream->parent, packet_index.timestamp_begin); + /* Convert the timestamps and append to the real_index. */ + cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp( + &file_stream->parent, + cur_index->ts_cycles.timestamp_begin); + cur_index->ts_real.timestamp_end = ctf_get_real_timestamp( + &file_stream->parent, + cur_index->ts_cycles.timestamp_end); + + ctf_update_current_packet_index(&file_stream->parent, + prev_index, cur_index); + + file_stream->parent.cycles_timestamp = + cur_index->ts_cycles.timestamp_begin; + file_stream->parent.real_timestamp = + cur_index->ts_real.timestamp_begin; } if (pos->packet_size == 0 || pos->offset == EOF) { @@ -813,8 +1000,8 @@ retry: printf_verbose("get_data_packet for stream %" PRIu64 "\n", viewer_stream->id); ret = get_data_packet(session->ctx, pos, viewer_stream, - be64toh(packet_index.offset), - packet_index.packet_size / CHAR_BIT); + be64toh(cur_index->offset), + cur_index->packet_size / CHAR_BIT); if (ret == -2) { goto retry; } else if (ret < 0) { @@ -826,8 +1013,9 @@ retry: printf_verbose("Index received : packet_size : %" PRIu64 ", offset %" PRIu64 ", content_size %" PRIu64 ", timestamp_end : %" PRIu64 "\n", - packet_index.packet_size, packet_index.offset, - packet_index.content_size, packet_index.timestamp_end); + cur_index->packet_size, cur_index->offset, + cur_index->content_size, + cur_index->ts_cycles.timestamp_end); /* update trace_packet_header and stream_packet_context */ if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) { @@ -854,7 +1042,50 @@ end: return; } -static int del_traces(gpointer key, gpointer value, gpointer user_data) +int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_create_session_response resp; + int ret; + ssize_t ret_len; + + cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); + cmd.data_size = 0; + cmd.cmd_version = 0; + + do { + ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error sending cmd\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(cmd)); + + do { + ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error receiving create session reply\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(resp)); + + if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { + fprintf(stderr, "[error] Error creating viewer session\n"); + ret = -1; + goto error; + } + ret = 0; + +error: + return ret; +} + +static +int del_traces(gpointer key, gpointer value, gpointer user_data) { struct bt_context *bt_ctx = user_data; struct lttng_live_ctf_trace *trace = value; @@ -868,7 +1099,8 @@ static int del_traces(gpointer key, gpointer value, gpointer user_data) return 1; } -static void add_traces(gpointer key, gpointer value, gpointer user_data) +static +void add_traces(gpointer key, gpointer value, gpointer user_data) { int i, ret, total_metadata = 0; uint64_t metadata_len; @@ -879,6 +1111,18 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data) struct bt_mmap_stream_list mmap_list; struct lttng_live_ctx *ctx = NULL; + /* + * We don't know how many streams we will receive for a trace, so + * once we are done receiving the traces, we add all the traces + * received to the bt_context. + * We can receive streams during the attach command or the + * get_new_streams, so we have to make sure not to add multiple + * times the same traces. + * If a trace is already in the context, we just skip this function. + */ + if (trace->in_use) + return; + BT_INIT_LIST_HEAD(&mmap_list.head); for (i = 0; i < trace->streams->len; i++) { @@ -914,7 +1158,20 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data) fprintf(stderr, "[error] Error adding trace\n"); goto end_free; } + + if (bt_ctx->current_iterator) { + struct bt_trace_descriptor *td; + struct bt_trace_handle *handle; + + handle = (struct bt_trace_handle *) g_hash_table_lookup( + bt_ctx->trace_handles, + (gpointer) (unsigned long) ret); + td = handle->td; + bt_iter_add_trace(bt_ctx->current_iterator, td); + } + trace->trace_id = ret; + trace->in_use = 1; goto end; @@ -924,19 +1181,168 @@ end: return; } -void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id) +int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) { - int ret, active_session = 0; - struct bt_context *bt_ctx; + struct lttng_viewer_cmd cmd; + struct lttng_viewer_new_streams_request rq; + struct lttng_viewer_new_streams_response rp; + struct lttng_viewer_stream stream; + int ret, i; + ssize_t ret_len; + uint32_t stream_count; + + cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); + cmd.data_size = sizeof(rq); + cmd.cmd_version = 0; + + memset(&rq, 0, sizeof(rq)); + rq.session_id = htobe64(id); + + do { + ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error sending cmd\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(cmd)); + + do { + ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error sending get_new_streams request\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(rq)); + + do { + ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error receiving get_new_streams response\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(rp)); + + switch(be32toh(rp.status)) { + case LTTNG_VIEWER_NEW_STREAMS_OK: + break; + case LTTNG_VIEWER_NEW_STREAMS_NO_NEW: + ret = 0; + goto end; + case LTTNG_VIEWER_NEW_STREAMS_HUP: + ret = -LTTNG_VIEWER_NEW_STREAMS_HUP; + goto end; + case LTTNG_VIEWER_NEW_STREAMS_ERR: + fprintf(stderr, "[error] get_new_streams error\n"); + ret = -1; + goto end; + default: + fprintf(stderr, "[error] Unknown return code %u\n", + be32toh(rp.status)); + ret = -1; + goto end; + } + + stream_count = be32toh(rp.streams_count); + ctx->session->stream_count += stream_count; + /* + * When the session is created but not started, we do an active wait + * until it starts. It allows the viewer to start processing the trace + * as soon as the session starts. + */ + if (ctx->session->stream_count == 0) { + ret = 0; + goto end; + } + printf_verbose("Waiting for %" PRIu64 " streams:\n", + ctx->session->stream_count); + ctx->session->streams = g_new0(struct lttng_live_viewer_stream, + ctx->session->stream_count); + for (i = 0; i < stream_count; i++) { + do { + ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error receiving stream\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(stream)); + stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; + stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; + + printf_verbose(" stream %" PRIu64 " : %s/%s\n", + be64toh(stream.id), stream.path_name, + stream.channel_name); + ctx->session->streams[i].id = be64toh(stream.id); + ctx->session->streams[i].session = ctx->session; + + ctx->session->streams[i].first_read = 1; + ctx->session->streams[i].mmap_size = 0; + + if (be32toh(stream.metadata_flag)) { + char *path; + + path = strdup(LTTNG_METADATA_PATH_TEMPLATE); + if (!path) { + perror("strdup"); + ret = -1; + goto error; + } + if (!mkdtemp(path)) { + perror("mkdtemp"); + free(path); + ret = -1; + goto error; + } + ctx->session->streams[i].metadata_flag = 1; + snprintf(ctx->session->streams[i].path, + sizeof(ctx->session->streams[i].path), + "%s/%s", path, + stream.channel_name); + ret = open(ctx->session->streams[i].path, + O_WRONLY | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + if (ret < 0) { + perror("open"); + free(path); + goto error; + } + ctx->session->streams[i].fd = ret; + free(path); + } + ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i], + be64toh(stream.ctf_trace_id)); + if (ret < 0) { + goto error; + } + + } + ret = 0; + +end: +error: + return ret; +} + +void lttng_live_read(struct lttng_live_ctx *ctx) +{ + int ret, i; struct bt_ctf_iter *iter; const struct bt_ctf_event *event; struct bt_iter_pos begin_pos; struct bt_trace_descriptor *td_write; struct bt_format *fmt_write; struct ctf_text_stream_pos *sout; + uint64_t id; - bt_ctx = bt_context_create(); - if (!bt_ctx) { + ctx->bt_ctx = bt_context_create(); + if (!ctx->bt_ctx) { fprintf(stderr, "[error] bt_context_create allocation\n"); goto end; } @@ -958,33 +1364,43 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id) if (!sout->parent.event_cb) goto end_free; + ret = lttng_live_create_viewer_session(ctx); + if (ret < 0) { + goto end_free; + } + + for (i = 0; i < ctx->session_ids->len; i++) { + id = g_array_index(ctx->session_ids, uint64_t, i); + printf_verbose("Attaching to session %lu\n", id); + ret = lttng_live_attach_session(ctx, id); + printf_verbose("Attaching session returns %d\n", ret); + if (ret < 0) { + if (ret == -LTTNG_VIEWER_ATTACH_UNK) { + fprintf(stderr, "[error] Unknown session ID\n"); + } + goto end_free; + } + } + /* - * As long as the session is active, we try to reattach to it, - * even if all the streams get closed. + * As long as the session is active, we try to get new streams. */ - do { + for (;;) { int flags; - do { - ret = lttng_live_attach_session(ctx, session_id); - printf_verbose("Attaching session returns %d\n", ret); - if (ret < 0) { - if (ret == -LTTNG_VIEWER_ATTACH_UNK) { - if (active_session) - goto end_free; - fprintf(stderr, "[error] Unknown " - "session ID\n"); - } + while (!ctx->session->stream_count) { + if (ctx->session_ids->len == 0) goto end_free; - } else { - active_session = 1; - } - } while (ctx->session->stream_count == 0); + ret = ask_new_streams(ctx); + if (ret < 0) + goto end_free; + } - g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx); + g_hash_table_foreach(ctx->session->ctf_traces, add_traces, + ctx->bt_ctx); begin_pos.type = BT_SEEK_BEGIN; - iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL); + iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL); if (!iter) { fprintf(stderr, "[error] Iterator creation error\n"); goto end; @@ -1010,11 +1426,13 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id) } } bt_ctf_iter_destroy(iter); - g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx); - } while (active_session); + g_hash_table_foreach_remove(ctx->session->ctf_traces, + del_traces, ctx->bt_ctx); + ctx->session->stream_count = 0; + } end_free: - bt_context_put(bt_ctx); + bt_context_put(ctx->bt_ctx); end: return; }