Create the live viewer session before attaching
[babeltrace.git] / formats / lttng-live / lttng-live-functions.c
index 2d923618d2326fa707a32096aad1b80f26f2f744..0974b567a657c50314f10d98a477ebc96179a677 100644 (file)
@@ -50,7 +50,7 @@
 #include <formats/ctf/events-private.h>
 
 #include "lttng-live-functions.h"
-#include "lttng-viewer.h"
+#include "lttng-viewer-abi.h"
 
 /*
  * Memory allocation zeroed
@@ -230,11 +230,6 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
                        (gpointer) ctf_trace_id);
        if (!trace) {
                trace = g_new0(struct lttng_live_ctf_trace, 1);
-               if (!trace) {
-                       ret = -1;
-                       fprintf(stderr, "[error] ctf_trace allocation\n");
-                       goto error;
-               }
                trace->ctf_trace_id = ctf_trace_id;
                trace->streams = g_ptr_array_new();
                g_hash_table_insert(stream->session->ctf_traces,
@@ -247,7 +242,6 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
        stream->ctf_trace = trace;
        g_ptr_array_add(trace->streams, stream);
 
-error:
        return ret;
 }
 
@@ -343,11 +337,6 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                ctx->session->stream_count);
        ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
                        ctx->session->stream_count);
-       if (!ctx->session->streams) {
-               ret = -1;
-               goto error;
-       }
-
        for (i = 0; i < be32toh(rp.streams_count); i++) {
                do {
                        ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
@@ -374,7 +363,17 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                        char *path;
 
                        path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
-                       path = mkdtemp(path);
+                       if (!path) {
+                               perror("strdup");
+                               ret = -1;
+                               goto error;
+                       }
+                       if (!mkdtemp(path)) {
+                               perror("mkdtemp");
+                               free(path);
+                               ret = -1;
+                               goto error;
+                       }
                        ctx->session->streams[i].metadata_flag = 1;
                        snprintf(ctx->session->streams[i].path,
                                        sizeof(ctx->session->streams[i].path),
@@ -385,9 +384,11 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                                        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
                        if (ret < 0) {
                                perror("open");
+                               free(path);
                                goto error;
                        }
                        ctx->session->streams[i].fd = ret;
+                       free(path);
                }
                ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
                                be64toh(stream.ctf_trace_id));
@@ -453,7 +454,13 @@ int get_data_packet(struct lttng_live_ctx *ctx,
                ret = ret_len;
                goto error;
        }
-       assert(ret_len == sizeof(rp));
+       if (ret_len != sizeof(rp)) {
+               fprintf(stderr, "[error] get_data_packet: expected %" PRId64
+                               ", received %" PRId64 "\n", ret_len,
+                               sizeof(rp));
+               ret = -1;
+               goto error;
+       }
 
        rp.flags = be32toh(rp.flags);
 
@@ -715,7 +722,7 @@ retry:
        case LTTNG_VIEWER_INDEX_INACTIVE:
                printf_verbose("get_next_index: inactive\n");
                memset(index, 0, sizeof(struct packet_index));
-               index->timestamp_end = be64toh(rp.timestamp_end);
+               index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
                break;
        case LTTNG_VIEWER_INDEX_OK:
                printf_verbose("get_next_index: Ok, need metadata update : %u\n",
@@ -723,8 +730,8 @@ retry:
                index->offset = be64toh(rp.offset);
                index->packet_size = be64toh(rp.packet_size);
                index->content_size = be64toh(rp.content_size);
-               index->timestamp_begin = be64toh(rp.timestamp_begin);
-               index->timestamp_end = be64toh(rp.timestamp_end);
+               index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
+               index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
                index->events_discarded = be64toh(rp.events_discarded);
 
                if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
@@ -767,7 +774,7 @@ void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
 {
        struct ctf_stream_pos *pos;
        struct ctf_file_stream *file_stream;
-       struct packet_index packet_index;
+       struct packet_index *prev_index = NULL, *cur_index;
        struct lttng_live_viewer_stream *viewer_stream;
        struct lttng_live_session *session;
        int ret;
@@ -778,31 +785,72 @@ retry:
        viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
        session = viewer_stream->session;
 
+       switch (pos->packet_index->len) {
+       case 0:
+               g_array_set_size(pos->packet_index, 1);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               break;
+       case 1:
+               g_array_set_size(pos->packet_index, 2);
+               prev_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 1);
+               break;
+       case 2:
+               g_array_index(pos->packet_index,
+                       struct packet_index, 0) =
+                               g_array_index(pos->packet_index,
+                                       struct packet_index, 1);
+               prev_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 0);
+               cur_index = &g_array_index(pos->packet_index,
+                               struct packet_index, 1);
+               break;
+       default:
+               abort();
+               break;
+       }
        printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
-       ret = get_next_index(session->ctx, viewer_stream, &packet_index);
+       ret = get_next_index(session->ctx, viewer_stream, cur_index);
        if (ret < 0) {
                pos->offset = EOF;
                fprintf(stderr, "[error] get_next_index failed\n");
                return;
        }
 
-       pos->packet_size = packet_index.packet_size;
-       pos->content_size = packet_index.content_size;
+       pos->packet_size = cur_index->packet_size;
+       pos->content_size = cur_index->content_size;
        pos->mmap_base_offset = 0;
-       if (packet_index.offset == EOF) {
+       if (cur_index->offset == EOF) {
                pos->offset = EOF;
        } else {
                pos->offset = 0;
        }
 
-       if (packet_index.content_size == 0) {
-               file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
+       if (cur_index->content_size == 0) {
+               file_stream->parent.cycles_timestamp =
+                               cur_index->ts_cycles.timestamp_end;
                file_stream->parent.real_timestamp = ctf_get_real_timestamp(
-                               &file_stream->parent, packet_index.timestamp_end);
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_end);
        } else {
-               file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
-               file_stream->parent.real_timestamp = ctf_get_real_timestamp(
-                               &file_stream->parent, packet_index.timestamp_begin);
+               /* Convert the timestamps and append to the real_index. */
+               cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_begin);
+               cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
+                               &file_stream->parent,
+                               cur_index->ts_cycles.timestamp_end);
+
+               ctf_update_current_packet_index(&file_stream->parent,
+                               prev_index, cur_index);
+
+               file_stream->parent.cycles_timestamp =
+                               cur_index->ts_cycles.timestamp_begin;
+               file_stream->parent.real_timestamp =
+                               cur_index->ts_real.timestamp_begin;
        }
 
        if (pos->packet_size == 0 || pos->offset == EOF) {
@@ -812,8 +860,8 @@ retry:
        printf_verbose("get_data_packet for stream %" PRIu64 "\n",
                        viewer_stream->id);
        ret = get_data_packet(session->ctx, pos, viewer_stream,
-                       be64toh(packet_index.offset),
-                       packet_index.packet_size / CHAR_BIT);
+                       be64toh(cur_index->offset),
+                       cur_index->packet_size / CHAR_BIT);
        if (ret == -2) {
                goto retry;
        } else if (ret < 0) {
@@ -825,8 +873,9 @@ retry:
        printf_verbose("Index received : packet_size : %" PRIu64
                        ", offset %" PRIu64 ", content_size %" PRIu64
                        ", timestamp_end : %" PRIu64 "\n",
-                       packet_index.packet_size, packet_index.offset,
-                       packet_index.content_size, packet_index.timestamp_end);
+                       cur_index->packet_size, cur_index->offset,
+                       cur_index->content_size,
+                       cur_index->ts_cycles.timestamp_end);
 
        /* update trace_packet_header and stream_packet_context */
        if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
@@ -923,6 +972,48 @@ end:
        return;
 }
 
+int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
+{
+       struct lttng_viewer_cmd cmd;
+       struct lttng_viewer_create_session_response resp;
+       int ret;
+       ssize_t ret_len;
+
+       cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
+       cmd.data_size = 0;
+       cmd.cmd_version = 0;
+
+       do {
+               ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending cmd\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(cmd));
+
+       do {
+               ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving create session reply\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(resp));
+
+       if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
+               fprintf(stderr, "[error] Error creating viewer session\n");
+               ret = -1;
+               goto error;
+       }
+       ret = 0;
+
+error:
+       return ret;
+}
+
 void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
 {
        int ret, active_session = 0;
@@ -957,6 +1048,11 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
        if (!sout->parent.event_cb)
                goto end_free;
 
+       ret = lttng_live_create_viewer_session(ctx);
+       if (ret < 0) {
+               goto end_free;
+       }
+
        /*
         * As long as the session is active, we try to reattach to it,
         * even if all the streams get closed.
This page took 0.040376 seconds and 4 git commands to generate.