Attach and list by session name and hostname
[babeltrace.git] / formats / lttng-live / lttng-live-functions.c
index 2d923618d2326fa707a32096aad1b80f26f2f744..60f5fb51f346b6b8a64184a1e01a1ae2ca03ade1 100644 (file)
@@ -50,7 +50,7 @@
 #include <formats/ctf/events-private.h>
 
 #include "lttng-live-functions.h"
-#include "lttng-viewer.h"
+#include "lttng-viewer-abi.h"
 
 /*
  * Memory allocation zeroed
        ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
 #endif
 
-int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
-               int port)
+static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
+               size_t index, int whence);
+static void add_traces(gpointer key, gpointer value, gpointer user_data);
+static int del_traces(gpointer key, gpointer value, gpointer user_data);
+
+int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
 {
        struct hostent *host;
        struct sockaddr_in server_addr;
        int ret;
 
-       host = gethostbyname(hostname);
+       host = gethostbyname(ctx->relay_hostname);
        if (!host) {
                ret = -1;
                goto end;
@@ -82,7 +86,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
        }
 
        server_addr.sin_family = AF_INET;
-       server_addr.sin_port = htons(port);
+       server_addr.sin_port = htons(ctx->port);
        server_addr.sin_addr = *((struct in_addr *) host->h_addr);
        bzero(&(server_addr.sin_zero), 8);
 
@@ -156,14 +160,82 @@ error:
        return ret;
 }
 
+static
+void free_session_list(GPtrArray *session_list)
+{
+       int i;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               free(relay_session->name);
+               free(relay_session->hostname);
+       }
+       g_ptr_array_free(session_list, TRUE);
+}
+
+static
+void print_session_list(GPtrArray *session_list, const char *path)
+{
+       int i;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+                               "%u stream(s), %u client(s) connected)\n",
+                               path, relay_session->hostname,
+                               relay_session->name, relay_session->timer,
+                               relay_session->streams, relay_session->clients);
+       }
+}
+
+static
+void update_session_list(GPtrArray *session_list, char *hostname,
+               char *session_name, uint32_t streams, uint32_t clients,
+               uint32_t timer)
+{
+       int i, found = 0;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+                               strncmp(relay_session->name,
+                                       session_name, NAME_MAX) == 0) {
+                       relay_session->streams += streams;
+                       if (relay_session->clients < clients)
+                               relay_session->clients = clients;
+                       found = 1;
+                       break;
+               }
+       }
+       if (found)
+               return;
+
+       relay_session = g_new0(struct lttng_live_relay_session, 1);
+       relay_session->hostname = strndup(hostname, NAME_MAX);
+       relay_session->name = strndup(session_name, NAME_MAX);
+       relay_session->clients = clients;
+       relay_session->streams = streams;
+       relay_session->timer = timer;
+       g_ptr_array_add(session_list, relay_session);
+}
+
 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        struct lttng_viewer_session lsession;
-       int i, ret;
+       int i, ret, sessions_count, print_list = 0;
        ssize_t ret_len;
-       int sessions_count;
+       uint64_t session_id;
+       GPtrArray *session_list = NULL;
+
+       if (strlen(ctx->session_name) == 0) {
+               print_list = 1;
+               session_list = g_ptr_array_new();
+       }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
        cmd.data_size = 0;
@@ -190,8 +262,6 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
        assert(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
-       fprintf(stdout, "%u active session(s)%c\n", sessions_count,
-                       sessions_count > 0 ? ':' : ' ');
        for (i = 0; i < sessions_count; i++) {
                do {
                        ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
@@ -204,14 +274,30 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
                assert(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+               session_id = be64toh(lsession.id);
+
+               if (print_list) {
+                       update_session_list(session_list,
+                                       lsession.hostname,
+                                       lsession.session_name,
+                                       be32toh(lsession.streams),
+                                       be32toh(lsession.clients),
+                                       be32toh(lsession.live_timer));
+               } else {
+                       if ((strncmp(lsession.session_name, ctx->session_name,
+                               NAME_MAX) == 0) && (strncmp(lsession.hostname,
+                                       ctx->traced_hostname, NAME_MAX) == 0)) {
+                               printf_verbose("Reading from session %" PRIu64 "\n",
+                                               session_id);
+                               g_array_append_val(ctx->session_ids,
+                                               session_id);
+                       }
+               }
+       }
 
-               fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
-                               "%u stream(s), %u client(s) connected)\n",
-                               path, be64toh(lsession.id),
-                               lsession.session_name, lsession.hostname,
-                               be32toh(lsession.live_timer),
-                               be32toh(lsession.streams),
-                               be32toh(lsession.clients));
+       if (print_list) {
+               print_session_list(session_list, path);
+               free_session_list(session_list);
        }
 
        ret = 0;
@@ -230,11 +316,6 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
                        (gpointer) ctf_trace_id);
        if (!trace) {
                trace = g_new0(struct lttng_live_ctf_trace, 1);
-               if (!trace) {
-                       ret = -1;
-                       fprintf(stderr, "[error] ctf_trace allocation\n");
-                       goto error;
-               }
                trace->ctf_trace_id = ctf_trace_id;
                trace->streams = g_ptr_array_new();
                g_hash_table_insert(stream->session->ctf_traces,
@@ -247,7 +328,6 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
        stream->ctf_trace = trace;
        g_ptr_array_add(trace->streams, stream);
 
-error:
        return ret;
 }
 
@@ -329,7 +409,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                goto end;
        }
 
-       ctx->session->stream_count = be32toh(rp.streams_count);
+       ctx->session->stream_count += be32toh(rp.streams_count);
        /*
         * When the session is created but not started, we do an active wait
         * until it starts. It allows the viewer to start processing the trace
@@ -343,11 +423,6 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                ctx->session->stream_count);
        ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
                        ctx->session->stream_count);
-       if (!ctx->session->streams) {
-               ret = -1;
-               goto error;
-       }
-
        for (i = 0; i < be32toh(rp.streams_count); i++) {
                do {
                        ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
@@ -374,7 +449,17 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                        char *path;
 
                        path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
-                       path = mkdtemp(path);
+                       if (!path) {
+                               perror("strdup");
+                               ret = -1;
+                               goto error;
+                       }
+                       if (!mkdtemp(path)) {
+                               perror("mkdtemp");
+                               free(path);
+                               ret = -1;
+                               goto error;
+                       }
                        ctx->session->streams[i].metadata_flag = 1;
                        snprintf(ctx->session->streams[i].path,
                                        sizeof(ctx->session->streams[i].path),
@@ -385,9 +470,11 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                                        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
                        if (ret < 0) {
                                perror("open");
+                               free(path);
                                goto error;
                        }
                        ctx->session->streams[i].fd = ret;
+                       free(path);
                }
                ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
                                be64toh(stream.ctf_trace_id));
@@ -403,6 +490,44 @@ error:
        return ret;
 }
 
+static
+int ask_new_streams(struct lttng_live_ctx *ctx)
+{
+       int i, ret = 0;
+       uint64_t id;
+
+restart:
+       for (i = 0; i < ctx->session_ids->len; i++) {
+               id = g_array_index(ctx->session_ids, uint64_t, i);
+               ret = lttng_live_get_new_streams(ctx, id);
+               printf_verbose("Asking for new streams returns %d\n",
+                               ret);
+               if (ret < 0) {
+                       if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
+                               printf_verbose("Session %" PRIu64 " closed\n",
+                                               id);
+                               /*
+                                * The streams have already been closed during
+                                * the reading, so we only need to get rid of
+                                * the trace in our internal table of sessions.
+                                */
+                               g_array_remove_index(ctx->session_ids, i);
+                               /*
+                                * We can't continue iterating on the g_array
+                                * after a remove, we have to start again.
+                                */
+                               goto restart;
+                       } else {
+                               ret = -1;
+                               goto end;
+                       }
+               }
+       }
+
+end:
+       return ret;
+}
+
 static
 int get_data_packet(struct lttng_live_ctx *ctx,
                struct ctf_stream_pos *pos,
@@ -453,7 +578,13 @@ int get_data_packet(struct lttng_live_ctx *ctx,
                ret = ret_len;
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       if (ret_len != sizeof(rp)) {
+               fprintf(stderr, "[error] get_data_packet: expected %" PRId64
+                               ", received %" PRId64 "\n", ret_len,
+                               sizeof(rp));
+               ret = -1;
+               goto error;
+       }
 
        rp.flags = be32toh(rp.flags);
 
@@ -473,6 +604,13 @@ int get_data_packet(struct lttng_live_ctx *ctx,
                        ret = 0;
                        goto end;
                }
+               if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto error;
+                       g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                                       ctx->bt_ctx);
+               }
                fprintf(stderr, "[error] get_data_packet: error\n");
                ret = -1;
                goto end;
@@ -715,7 +853,7 @@ retry:
        case LTTNG_VIEWER_INDEX_INACTIVE:
                printf_verbose("get_next_index: inactive\n");
                memset(index, 0, sizeof(struct packet_index));
-               index->timestamp_end = be64toh(rp.timestamp_end);
+               index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
                break;
        case LTTNG_VIEWER_INDEX_OK:
                printf_verbose("get_next_index: Ok, need metadata update : %u\n",
@@ -723,8 +861,8 @@ retry:
                index->offset = be64toh(rp.offset);
                index->packet_size = be64toh(rp.packet_size);
                index->content_size = be64toh(rp.content_size);
-               index->timestamp_begin = be64toh(rp.timestamp_begin);
-               index->timestamp_end = be64toh(rp.timestamp_end);
+               index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
+               index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
                index->events_discarded = be64toh(rp.events_discarded);
 
                if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
@@ -735,6 +873,13 @@ retry:
                                goto error;
                        }
                }
+               if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto error;
+                       g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                                       ctx->bt_ctx);
+               }
                break;
        case LTTNG_VIEWER_INDEX_RETRY:
                printf_verbose("get_next_index: retry\n");
@@ -745,6 +890,7 @@ retry:
                viewer_stream->id = -1ULL;
                viewer_stream->fd = -1;
                index->offset = EOF;
+               ctx->session->stream_count--;
                break;
        case LTTNG_VIEWER_INDEX_ERR:
                fprintf(stderr, "[error] get_next_index: error\n");
@@ -762,12 +908,13 @@ error:
        return ret;
 }
 
+static
 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
                int whence)
 {
        struct ctf_stream_pos *pos;
        struct ctf_file_stream *file_stream;
-       struct packet_index packet_index;
+       struct packet_index *prev_index = NULL, *cur_index;
        struct lttng_live_viewer_stream *viewer_stream;
        struct lttng_live_session *session;
        int ret;
@@ -778,31 +925,72 @@ retry:
        viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
        session = viewer_stream->session;
 
+       switch (pos->packet_index->len) {
+       case 0:
+               g_array_set_size(pos->packet_index, 1);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               break;
+       case 1:
+               g_array_set_size(pos->packet_index, 2);
+               prev_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 1);
+               break;
+       case 2:
+               g_array_index(pos->packet_index,
+                       struct packet_index, 0) =
+                               g_array_index(pos->packet_index,
+                                       struct packet_index, 1);
+               prev_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 1);
+               break;
+       default:
+               abort();
+               break;
+       }
        printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
-       ret = get_next_index(session->ctx, viewer_stream, &packet_index);
+       ret = get_next_index(session->ctx, viewer_stream, cur_index);
        if (ret < 0) {
                pos->offset = EOF;
                fprintf(stderr, "[error] get_next_index failed\n");
                return;
        }
 
-       pos->packet_size = packet_index.packet_size;
-       pos->content_size = packet_index.content_size;
+       pos->packet_size = cur_index->packet_size;
+       pos->content_size = cur_index->content_size;
        pos->mmap_base_offset = 0;
-       if (packet_index.offset == EOF) {
+       if (cur_index->offset == EOF) {
                pos->offset = EOF;
        } else {
                pos->offset = 0;
        }
 
-       if (packet_index.content_size == 0) {
-               file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
+       if (cur_index->content_size == 0) {
+               file_stream->parent.cycles_timestamp =
+                               cur_index->ts_cycles.timestamp_end;
                file_stream->parent.real_timestamp = ctf_get_real_timestamp(
-                               &file_stream->parent, packet_index.timestamp_end);
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_end);
        } else {
-               file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
-               file_stream->parent.real_timestamp = ctf_get_real_timestamp(
-                               &file_stream->parent, packet_index.timestamp_begin);
+               /* Convert the timestamps and append to the real_index. */
+               cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_begin);
+               cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_end);
+
+               ctf_update_current_packet_index(&file_stream->parent,
+                               prev_index, cur_index);
+
+               file_stream->parent.cycles_timestamp =
+                               cur_index->ts_cycles.timestamp_begin;
+               file_stream->parent.real_timestamp =
+                               cur_index->ts_real.timestamp_begin;
        }
 
        if (pos->packet_size == 0 || pos->offset == EOF) {
@@ -812,8 +1000,8 @@ retry:
        printf_verbose("get_data_packet for stream %" PRIu64 "\n",
                        viewer_stream->id);
        ret = get_data_packet(session->ctx, pos, viewer_stream,
-                       be64toh(packet_index.offset),
-                       packet_index.packet_size / CHAR_BIT);
+                       be64toh(cur_index->offset),
+                       cur_index->packet_size / CHAR_BIT);
        if (ret == -2) {
                goto retry;
        } else if (ret < 0) {
@@ -825,8 +1013,9 @@ retry:
        printf_verbose("Index received : packet_size : %" PRIu64
                        ", offset %" PRIu64 ", content_size %" PRIu64
                        ", timestamp_end : %" PRIu64 "\n",
-                       packet_index.packet_size, packet_index.offset,
-                       packet_index.content_size, packet_index.timestamp_end);
+                       cur_index->packet_size, cur_index->offset,
+                       cur_index->content_size,
+                       cur_index->ts_cycles.timestamp_end);
 
        /* update trace_packet_header and stream_packet_context */
        if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
@@ -853,7 +1042,50 @@ end:
        return;
 }
 
-static int del_traces(gpointer key, gpointer value, gpointer user_data)
+int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
+{
+       struct lttng_viewer_cmd cmd;
+       struct lttng_viewer_create_session_response resp;
+       int ret;
+       ssize_t ret_len;
+
+       cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
+       cmd.data_size = 0;
+       cmd.cmd_version = 0;
+
+       do {
+               ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending cmd\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(cmd));
+
+       do {
+               ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving create session reply\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(resp));
+
+       if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
+               fprintf(stderr, "[error] Error creating viewer session\n");
+               ret = -1;
+               goto error;
+       }
+       ret = 0;
+
+error:
+       return ret;
+}
+
+static
+int del_traces(gpointer key, gpointer value, gpointer user_data)
 {
        struct bt_context *bt_ctx = user_data;
        struct lttng_live_ctf_trace *trace = value;
@@ -867,7 +1099,8 @@ static int del_traces(gpointer key, gpointer value, gpointer user_data)
        return 1;
 }
 
-static void add_traces(gpointer key, gpointer value, gpointer user_data)
+static
+void add_traces(gpointer key, gpointer value, gpointer user_data)
 {
        int i, ret, total_metadata = 0;
        uint64_t metadata_len;
@@ -878,6 +1111,18 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data)
        struct bt_mmap_stream_list mmap_list;
        struct lttng_live_ctx *ctx = NULL;
 
+       /*
+        * We don't know how many streams we will receive for a trace, so
+        * once we are done receiving the traces, we add all the traces
+        * received to the bt_context.
+        * We can receive streams during the attach command or the
+        * get_new_streams, so we have to make sure not to add multiple
+        * times the same traces.
+        * If a trace is already in the context, we just skip this function.
+        */
+       if (trace->in_use)
+               return;
+
        BT_INIT_LIST_HEAD(&mmap_list.head);
 
        for (i = 0; i < trace->streams->len; i++) {
@@ -913,7 +1158,20 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data)
                fprintf(stderr, "[error] Error adding trace\n");
                goto end_free;
        }
+
+       if (bt_ctx->current_iterator) {
+               struct bt_trace_descriptor *td;
+               struct bt_trace_handle *handle;
+
+               handle = (struct bt_trace_handle *) g_hash_table_lookup(
+                               bt_ctx->trace_handles,
+                               (gpointer) (unsigned long) ret);
+               td = handle->td;
+               bt_iter_add_trace(bt_ctx->current_iterator, td);
+       }
+
        trace->trace_id = ret;
+       trace->in_use = 1;
 
        goto end;
 
@@ -923,19 +1181,168 @@ end:
        return;
 }
 
-void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
+int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
+{
+       struct lttng_viewer_cmd cmd;
+       struct lttng_viewer_new_streams_request rq;
+       struct lttng_viewer_new_streams_response rp;
+       struct lttng_viewer_stream stream;
+       int ret, i;
+       ssize_t ret_len;
+       uint32_t stream_count;
+
+       cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
+       cmd.data_size = sizeof(rq);
+       cmd.cmd_version = 0;
+
+       memset(&rq, 0, sizeof(rq));
+       rq.session_id = htobe64(id);
+
+       do {
+               ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending cmd\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(cmd));
+
+       do {
+               ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending get_new_streams request\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(rq));
+
+       do {
+               ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving get_new_streams response\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(rp));
+
+       switch(be32toh(rp.status)) {
+       case LTTNG_VIEWER_NEW_STREAMS_OK:
+               break;
+       case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
+               ret = 0;
+               goto end;
+       case LTTNG_VIEWER_NEW_STREAMS_HUP:
+               ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
+               goto end;
+       case LTTNG_VIEWER_NEW_STREAMS_ERR:
+               fprintf(stderr, "[error] get_new_streams error\n");
+               ret = -1;
+               goto end;
+       default:
+               fprintf(stderr, "[error] Unknown return code %u\n",
+                               be32toh(rp.status));
+               ret = -1;
+               goto end;
+       }
+
+       stream_count = be32toh(rp.streams_count);
+       ctx->session->stream_count += stream_count;
+       /*
+        * When the session is created but not started, we do an active wait
+        * until it starts. It allows the viewer to start processing the trace
+        * as soon as the session starts.
+        */
+       if (ctx->session->stream_count == 0) {
+               ret = 0;
+               goto end;
+       }
+       printf_verbose("Waiting for %" PRIu64 " streams:\n",
+               ctx->session->stream_count);
+       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
+                       ctx->session->stream_count);
+       for (i = 0; i < stream_count; i++) {
+               do {
+                       ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
+               } while (ret_len < 0 && errno == EINTR);
+               if (ret_len < 0) {
+                       fprintf(stderr, "[error] Error receiving stream\n");
+                       ret = ret_len;
+                       goto error;
+               }
+               assert(ret_len == sizeof(stream));
+               stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
+               stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+
+               printf_verbose("    stream %" PRIu64 " : %s/%s\n",
+                               be64toh(stream.id), stream.path_name,
+                               stream.channel_name);
+               ctx->session->streams[i].id = be64toh(stream.id);
+               ctx->session->streams[i].session = ctx->session;
+
+               ctx->session->streams[i].first_read = 1;
+               ctx->session->streams[i].mmap_size = 0;
+
+               if (be32toh(stream.metadata_flag)) {
+                       char *path;
+
+                       path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
+                       if (!path) {
+                               perror("strdup");
+                               ret = -1;
+                               goto error;
+                       }
+                       if (!mkdtemp(path)) {
+                               perror("mkdtemp");
+                               free(path);
+                               ret = -1;
+                               goto error;
+                       }
+                       ctx->session->streams[i].metadata_flag = 1;
+                       snprintf(ctx->session->streams[i].path,
+                                       sizeof(ctx->session->streams[i].path),
+                                       "%s/%s", path,
+                                       stream.channel_name);
+                       ret = open(ctx->session->streams[i].path,
+                                       O_WRONLY | O_CREAT | O_TRUNC,
+                                       S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+                       if (ret < 0) {
+                               perror("open");
+                               free(path);
+                               goto error;
+                       }
+                       ctx->session->streams[i].fd = ret;
+                       free(path);
+               }
+               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+                               be64toh(stream.ctf_trace_id));
+               if (ret < 0) {
+                       goto error;
+               }
+
+       }
+       ret = 0;
+
+end:
+error:
+       return ret;
+}
+
+void lttng_live_read(struct lttng_live_ctx *ctx)
 {
-       int ret, active_session = 0;
-       struct bt_context *bt_ctx;
+       int ret, i;
        struct bt_ctf_iter *iter;
        const struct bt_ctf_event *event;
        struct bt_iter_pos begin_pos;
        struct bt_trace_descriptor *td_write;
        struct bt_format *fmt_write;
        struct ctf_text_stream_pos *sout;
+       uint64_t id;
 
-       bt_ctx = bt_context_create();
-       if (!bt_ctx) {
+       ctx->bt_ctx = bt_context_create();
+       if (!ctx->bt_ctx) {
                fprintf(stderr, "[error] bt_context_create allocation\n");
                goto end;
        }
@@ -957,33 +1364,43 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
        if (!sout->parent.event_cb)
                goto end_free;
 
+       ret = lttng_live_create_viewer_session(ctx);
+       if (ret < 0) {
+               goto end_free;
+       }
+
+       for (i = 0; i < ctx->session_ids->len; i++) {
+               id = g_array_index(ctx->session_ids, uint64_t, i);
+               printf_verbose("Attaching to session %lu\n", id);
+               ret = lttng_live_attach_session(ctx, id);
+               printf_verbose("Attaching session returns %d\n", ret);
+               if (ret < 0) {
+                       if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
+                               fprintf(stderr, "[error] Unknown session ID\n");
+                       }
+                       goto end_free;
+               }
+       }
+
        /*
-        * As long as the session is active, we try to reattach to it,
-        * even if all the streams get closed.
+        * As long as the session is active, we try to get new streams.
         */
-       do {
+       for (;;) {
                int flags;
 
-               do {
-                       ret = lttng_live_attach_session(ctx, session_id);
-                       printf_verbose("Attaching session returns %d\n", ret);
-                       if (ret < 0) {
-                               if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
-                                       if (active_session)
-                                               goto end_free;
-                                       fprintf(stderr, "[error] Unknown "
-                                                       "session ID\n");
-                               }
+               while (!ctx->session->stream_count) {
+                       if (ctx->session_ids->len == 0)
                                goto end_free;
-                       } else {
-                               active_session = 1;
-                       }
-               } while (ctx->session->stream_count == 0);
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto end_free;
+               }
 
-               g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
+               g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                               ctx->bt_ctx);
 
                begin_pos.type = BT_SEEK_BEGIN;
-               iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
+               iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
                if (!iter) {
                        fprintf(stderr, "[error] Iterator creation error\n");
                        goto end;
@@ -1009,11 +1426,13 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
                        }
                }
                bt_ctf_iter_destroy(iter);
-               g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
-       } while (active_session);
+               g_hash_table_foreach_remove(ctx->session->ctf_traces,
+                               del_traces, ctx->bt_ctx);
+               ctx->session->stream_count = 0;
+       }
 
 end_free:
-       bt_context_put(bt_ctx);
+       bt_context_put(ctx->bt_ctx);
 end:
        return;
 }
This page took 0.03429 seconds and 4 git commands to generate.