get_new_streams and multi-session
[babeltrace.git] / formats / lttng-live / lttng-live-functions.c
index 375a05edeafd52b468a4f1b61709641eb7f99ec5..44c71cbc37304d025125d2d7de0c7b6f66ad9b24 100644 (file)
@@ -50,7 +50,7 @@
 #include <formats/ctf/events-private.h>
 
 #include "lttng-live-functions.h"
-#include "lttng-viewer.h"
+#include "lttng-viewer-abi.h"
 
 /*
  * Memory allocation zeroed
        ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
 #endif
 
+void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
+               int whence);
+static void add_traces(gpointer key, gpointer value, gpointer user_data);
+static int del_traces(gpointer key, gpointer value, gpointer user_data);
+
 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
                int port)
 {
@@ -323,7 +328,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                goto end;
        }
 
-       ctx->session->stream_count = be32toh(rp.streams_count);
+       ctx->session->stream_count += be32toh(rp.streams_count);
        /*
         * When the session is created but not started, we do an active wait
         * until it starts. It allows the viewer to start processing the trace
@@ -404,6 +409,39 @@ error:
        return ret;
 }
 
+static
+int ask_new_streams(struct lttng_live_ctx *ctx)
+{
+       int i, ret = 0;
+       uint64_t id;
+
+restart:
+       for (i = 0; i < ctx->session_ids->len; i++) {
+               id = g_array_index(ctx->session_ids, uint64_t, i);
+               ret = lttng_live_get_new_streams(ctx, id);
+               printf_verbose("Asking for new streams returns %d\n",
+                               ret);
+               if (ret < 0) {
+                       if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
+                               printf_verbose("Session %" PRIu64 " closed\n",
+                                               id);
+                               g_array_remove_index(ctx->session_ids, i);
+                               /*
+                                * We can't continue iterating on the g_array
+                                * after a remove, we have to start again.
+                                */
+                               goto restart;
+                       } else {
+                               ret = -1;
+                               goto end;
+                       }
+               }
+       }
+
+end:
+       return ret;
+}
+
 static
 int get_data_packet(struct lttng_live_ctx *ctx,
                struct ctf_stream_pos *pos,
@@ -480,6 +518,13 @@ int get_data_packet(struct lttng_live_ctx *ctx,
                        ret = 0;
                        goto end;
                }
+               if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto error;
+                       g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                                       ctx->bt_ctx);
+               }
                fprintf(stderr, "[error] get_data_packet: error\n");
                ret = -1;
                goto end;
@@ -742,6 +787,13 @@ retry:
                                goto error;
                        }
                }
+               if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto error;
+                       g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                                       ctx->bt_ctx);
+               }
                break;
        case LTTNG_VIEWER_INDEX_RETRY:
                printf_verbose("get_next_index: retry\n");
@@ -752,6 +804,7 @@ retry:
                viewer_stream->id = -1ULL;
                viewer_stream->fd = -1;
                index->offset = EOF;
+               ctx->session->stream_count--;
                break;
        case LTTNG_VIEWER_INDEX_ERR:
                fprintf(stderr, "[error] get_next_index: error\n");
@@ -774,7 +827,7 @@ void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
 {
        struct ctf_stream_pos *pos;
        struct ctf_file_stream *file_stream;
-       struct packet_index packet_index;
+       struct packet_index *prev_index = NULL, *cur_index;
        struct lttng_live_viewer_stream *viewer_stream;
        struct lttng_live_session *session;
        int ret;
@@ -785,31 +838,72 @@ retry:
        viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
        session = viewer_stream->session;
 
+       switch (pos->packet_index->len) {
+       case 0:
+               g_array_set_size(pos->packet_index, 1);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               break;
+       case 1:
+               g_array_set_size(pos->packet_index, 2);
+               prev_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 1);
+               break;
+       case 2:
+               g_array_index(pos->packet_index,
+                       struct packet_index, 0) =
+                               g_array_index(pos->packet_index,
+                                       struct packet_index, 1);
+               prev_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 1);
+               break;
+       default:
+               abort();
+               break;
+       }
        printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
-       ret = get_next_index(session->ctx, viewer_stream, &packet_index);
+       ret = get_next_index(session->ctx, viewer_stream, cur_index);
        if (ret < 0) {
                pos->offset = EOF;
                fprintf(stderr, "[error] get_next_index failed\n");
                return;
        }
 
-       pos->packet_size = packet_index.packet_size;
-       pos->content_size = packet_index.content_size;
+       pos->packet_size = cur_index->packet_size;
+       pos->content_size = cur_index->content_size;
        pos->mmap_base_offset = 0;
-       if (packet_index.offset == EOF) {
+       if (cur_index->offset == EOF) {
                pos->offset = EOF;
        } else {
                pos->offset = 0;
        }
 
-       if (packet_index.content_size == 0) {
-               file_stream->parent.cycles_timestamp = packet_index.ts_cycles.timestamp_end;
+       if (cur_index->content_size == 0) {
+               file_stream->parent.cycles_timestamp =
+                               cur_index->ts_cycles.timestamp_end;
                file_stream->parent.real_timestamp = ctf_get_real_timestamp(
-                               &file_stream->parent, packet_index.ts_cycles.timestamp_end);
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_end);
        } else {
-               file_stream->parent.cycles_timestamp = packet_index.ts_cycles.timestamp_begin;
-               file_stream->parent.real_timestamp = ctf_get_real_timestamp(
-                               &file_stream->parent, packet_index.ts_cycles.timestamp_begin);
+               /* Convert the timestamps and append to the real_index. */
+               cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_begin);
+               cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_end);
+
+               ctf_update_current_packet_index(&file_stream->parent,
+                               prev_index, cur_index);
+
+               file_stream->parent.cycles_timestamp =
+                               cur_index->ts_cycles.timestamp_begin;
+               file_stream->parent.real_timestamp =
+                               cur_index->ts_real.timestamp_begin;
        }
 
        if (pos->packet_size == 0 || pos->offset == EOF) {
@@ -819,8 +913,8 @@ retry:
        printf_verbose("get_data_packet for stream %" PRIu64 "\n",
                        viewer_stream->id);
        ret = get_data_packet(session->ctx, pos, viewer_stream,
-                       be64toh(packet_index.offset),
-                       packet_index.packet_size / CHAR_BIT);
+                       be64toh(cur_index->offset),
+                       cur_index->packet_size / CHAR_BIT);
        if (ret == -2) {
                goto retry;
        } else if (ret < 0) {
@@ -832,9 +926,9 @@ retry:
        printf_verbose("Index received : packet_size : %" PRIu64
                        ", offset %" PRIu64 ", content_size %" PRIu64
                        ", timestamp_end : %" PRIu64 "\n",
-                       packet_index.packet_size, packet_index.offset,
-                       packet_index.content_size,
-                       packet_index.ts_cycles.timestamp_end);
+                       cur_index->packet_size, cur_index->offset,
+                       cur_index->content_size,
+                       cur_index->ts_cycles.timestamp_end);
 
        /* update trace_packet_header and stream_packet_context */
        if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
@@ -861,7 +955,50 @@ end:
        return;
 }
 
-static int del_traces(gpointer key, gpointer value, gpointer user_data)
+int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
+{
+       struct lttng_viewer_cmd cmd;
+       struct lttng_viewer_create_session_response resp;
+       int ret;
+       ssize_t ret_len;
+
+       cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
+       cmd.data_size = 0;
+       cmd.cmd_version = 0;
+
+       do {
+               ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending cmd\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(cmd));
+
+       do {
+               ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving create session reply\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(resp));
+
+       if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
+               fprintf(stderr, "[error] Error creating viewer session\n");
+               ret = -1;
+               goto error;
+       }
+       ret = 0;
+
+error:
+       return ret;
+}
+
+static
+int del_traces(gpointer key, gpointer value, gpointer user_data)
 {
        struct bt_context *bt_ctx = user_data;
        struct lttng_live_ctf_trace *trace = value;
@@ -875,7 +1012,8 @@ static int del_traces(gpointer key, gpointer value, gpointer user_data)
        return 1;
 }
 
-static void add_traces(gpointer key, gpointer value, gpointer user_data)
+static
+void add_traces(gpointer key, gpointer value, gpointer user_data)
 {
        int i, ret, total_metadata = 0;
        uint64_t metadata_len;
@@ -886,6 +1024,9 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data)
        struct bt_mmap_stream_list mmap_list;
        struct lttng_live_ctx *ctx = NULL;
 
+       if (trace->in_use)
+               return;
+
        BT_INIT_LIST_HEAD(&mmap_list.head);
 
        for (i = 0; i < trace->streams->len; i++) {
@@ -921,7 +1062,21 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data)
                fprintf(stderr, "[error] Error adding trace\n");
                goto end_free;
        }
+
+       if (bt_ctx->current_iterator) {
+               struct bt_trace_descriptor *td;
+               struct bt_trace_handle *handle;
+
+               handle = (struct bt_trace_handle *) g_hash_table_lookup(
+                               bt_ctx->trace_handles,
+                               (gpointer) (unsigned long) ret);
+               td = handle->td;
+               fprintf(stderr, "Handle : %d, td : %p\n", ret, td);
+               bt_iter_add_trace(bt_ctx->current_iterator, td);
+       }
+
        trace->trace_id = ret;
+       trace->in_use = 1;
 
        goto end;
 
@@ -931,19 +1086,166 @@ end:
        return;
 }
 
-void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
+int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
+{
+       struct lttng_viewer_cmd cmd;
+       struct lttng_viewer_new_streams_request rq;
+       struct lttng_viewer_new_streams_response rp;
+       struct lttng_viewer_stream stream;
+       int ret, i;
+       ssize_t ret_len;
+
+       cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
+       cmd.data_size = sizeof(rq);
+       cmd.cmd_version = 0;
+
+       memset(&rq, 0, sizeof(rq));
+       rq.session_id = htobe64(id);
+
+       do {
+               ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending cmd\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(cmd));
+
+       do {
+               ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending attach request\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(rq));
+
+       do {
+               ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving get_new_streams response\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(rp));
+
+       switch(be32toh(rp.status)) {
+       case LTTNG_VIEWER_NEW_STREAMS_OK:
+               break;
+       case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
+               ret = 0;
+               goto end;
+       case LTTNG_VIEWER_NEW_STREAMS_HUP:
+               ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
+               goto end;
+       case LTTNG_VIEWER_NEW_STREAMS_ERR:
+               fprintf(stderr, "[error] get_new_streams error\n");
+               ret = -1;
+               goto end;
+       default:
+               fprintf(stderr, "[error] Unknown return code %u\n",
+                               be32toh(rp.status));
+               ret = -1;
+               goto end;
+       }
+
+       ctx->session->stream_count += be32toh(rp.streams_count);
+       /*
+        * When the session is created but not started, we do an active wait
+        * until it starts. It allows the viewer to start processing the trace
+        * as soon as the session starts.
+        */
+       if (ctx->session->stream_count == 0) {
+               ret = 0;
+               goto end;
+       }
+       printf_verbose("Waiting for %" PRIu64 " streams:\n",
+               ctx->session->stream_count);
+       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
+                       ctx->session->stream_count);
+       for (i = 0; i < be32toh(rp.streams_count); i++) {
+               do {
+                       ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
+               } while (ret_len < 0 && errno == EINTR);
+               if (ret_len < 0) {
+                       fprintf(stderr, "[error] Error receiving stream\n");
+                       ret = ret_len;
+                       goto error;
+               }
+               assert(ret_len == sizeof(stream));
+               stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
+               stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+
+               printf_verbose("    stream %" PRIu64 " : %s/%s\n",
+                               be64toh(stream.id), stream.path_name,
+                               stream.channel_name);
+               ctx->session->streams[i].id = be64toh(stream.id);
+               ctx->session->streams[i].session = ctx->session;
+
+               ctx->session->streams[i].first_read = 1;
+               ctx->session->streams[i].mmap_size = 0;
+
+               if (be32toh(stream.metadata_flag)) {
+                       char *path;
+
+                       path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
+                       if (!path) {
+                               perror("strdup");
+                               ret = -1;
+                               goto error;
+                       }
+                       if (!mkdtemp(path)) {
+                               perror("mkdtemp");
+                               free(path);
+                               ret = -1;
+                               goto error;
+                       }
+                       ctx->session->streams[i].metadata_flag = 1;
+                       snprintf(ctx->session->streams[i].path,
+                                       sizeof(ctx->session->streams[i].path),
+                                       "%s/%s", path,
+                                       stream.channel_name);
+                       ret = open(ctx->session->streams[i].path,
+                                       O_WRONLY | O_CREAT | O_TRUNC,
+                                       S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+                       if (ret < 0) {
+                               perror("open");
+                               free(path);
+                               goto error;
+                       }
+                       ctx->session->streams[i].fd = ret;
+                       free(path);
+               }
+               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+                               be64toh(stream.ctf_trace_id));
+               if (ret < 0) {
+                       goto error;
+               }
+
+       }
+       ret = 0;
+
+end:
+error:
+       return ret;
+}
+
+void lttng_live_read(struct lttng_live_ctx *ctx)
 {
-       int ret, active_session = 0;
-       struct bt_context *bt_ctx;
+       int ret, i;
        struct bt_ctf_iter *iter;
        const struct bt_ctf_event *event;
        struct bt_iter_pos begin_pos;
        struct bt_trace_descriptor *td_write;
        struct bt_format *fmt_write;
        struct ctf_text_stream_pos *sout;
+       uint64_t id;
 
-       bt_ctx = bt_context_create();
-       if (!bt_ctx) {
+       ctx->bt_ctx = bt_context_create();
+       if (!ctx->bt_ctx) {
                fprintf(stderr, "[error] bt_context_create allocation\n");
                goto end;
        }
@@ -965,33 +1267,43 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
        if (!sout->parent.event_cb)
                goto end_free;
 
+       ret = lttng_live_create_viewer_session(ctx);
+       if (ret < 0) {
+               goto end_free;
+       }
+
+       for (i = 0; i < ctx->session_ids->len; i++) {
+               id = g_array_index(ctx->session_ids, uint64_t, i);
+               printf_verbose("Attaching to session %lu\n", id);
+               ret = lttng_live_attach_session(ctx, id);
+               printf_verbose("Attaching session returns %d\n", ret);
+               if (ret < 0) {
+                       if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
+                               fprintf(stderr, "[error] Unknown session ID\n");
+                       }
+                       goto end_free;
+               }
+       }
+
        /*
-        * As long as the session is active, we try to reattach to it,
-        * even if all the streams get closed.
+        * As long as the session is active, we try to get new streams.
         */
-       do {
+       for (;;) {
                int flags;
 
-               do {
-                       ret = lttng_live_attach_session(ctx, session_id);
-                       printf_verbose("Attaching session returns %d\n", ret);
-                       if (ret < 0) {
-                               if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
-                                       if (active_session)
-                                               goto end_free;
-                                       fprintf(stderr, "[error] Unknown "
-                                                       "session ID\n");
-                               }
+               while (!ctx->session->stream_count) {
+                       if (ctx->session_ids->len == 0)
                                goto end_free;
-                       } else {
-                               active_session = 1;
-                       }
-               } while (ctx->session->stream_count == 0);
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto end_free;
+               }
 
-               g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
+               g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                               ctx->bt_ctx);
 
                begin_pos.type = BT_SEEK_BEGIN;
-               iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
+               iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
                if (!iter) {
                        fprintf(stderr, "[error] Iterator creation error\n");
                        goto end;
@@ -1017,11 +1329,13 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
                        }
                }
                bt_ctf_iter_destroy(iter);
-               g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
-       } while (active_session);
+               g_hash_table_foreach_remove(ctx->session->ctf_traces,
+                               del_traces, ctx->bt_ctx);
+               ctx->session->stream_count = 0;
+       }
 
 end_free:
-       bt_context_put(bt_ctx);
+       bt_context_put(ctx->bt_ctx);
 end:
        return;
 }
This page took 0.030265 seconds and 4 git commands to generate.