Fix: check the lttng-relayd protocol version
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
index 960c2240129cba5115223816b2a09f1578c5dd95..070674aa67c13298032d6b0fc5c4411ef03085f5 100644 (file)
@@ -50,6 +50,7 @@
 #include <babeltrace/ctf/events-internal.h>
 #include <formats/ctf/events-private.h>
 
+#include <babeltrace/endian.h>
 #include <babeltrace/compat/memstream.h>
 
 #include "lttng-live.h"
@@ -201,6 +202,18 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
                        be64toh(connect.viewer_session_id));
        printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
                        be32toh(connect.minor));
+
+       if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
+               fprintf(stderr, "[error] Incompatible lttng-relayd protocol\n");
+               goto error;
+       }
+       /* Use the smallest protocol version implemented. */
+       if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
+               ctx->minor =  be32toh(connect.minor);
+       } else {
+               ctx->minor =  LTTNG_LIVE_MINOR;
+       }
+       ctx->major = LTTNG_LIVE_MAJOR;
        ret = 0;
 end:
        return ret;
@@ -531,19 +544,26 @@ error:
        return -1;
 }
 
+/*
+ * Ask the relay for new streams.
+ *
+ * Returns the number of new streams received or a negative value on error.
+ */
 static
 int ask_new_streams(struct lttng_live_ctx *ctx)
 {
-       int i, ret = 0;
+       int i, ret = 0, nb_streams = 0;
        uint64_t id;
 
 restart:
        for (i = 0; i < ctx->session_ids->len; i++) {
                id = g_array_index(ctx->session_ids, uint64_t, i);
                ret = lttng_live_get_new_streams(ctx, id);
-               printf_verbose("Asking for new streams returns %d\n",
-                               ret);
+               printf_verbose("Asking for new streams returns %d\n", ret);
                if (ret < 0) {
+                       if (lttng_live_should_quit()) {
+                               goto end;
+                       }
                        if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
                                printf_verbose("Session %" PRIu64 " closed\n",
                                                id);
@@ -562,8 +582,11 @@ restart:
                                ret = -1;
                                goto end;
                        }
+               } else {
+                       nb_streams += ret;
                }
        }
+       ret = nb_streams;
 
 end:
        return ret;
@@ -688,8 +711,9 @@ retry:
                        ret = ask_new_streams(ctx);
                        if (ret < 0)
                                goto error;
-                       g_hash_table_foreach(ctx->session->ctf_traces,
-                                       add_traces, ctx->bt_ctx);
+                       else if (ret > 0)
+                               g_hash_table_foreach(ctx->session->ctf_traces,
+                                               add_traces, ctx->bt_ctx);
                }
                if (rp.flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
                                | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
@@ -871,7 +895,7 @@ int get_new_metadata(struct lttng_live_ctx *ctx,
 {
        int ret = 0;
        struct lttng_live_viewer_stream *metadata_stream;
-       size_t size, len_read = 0;;
+       size_t size, len_read = 0;
 
        metadata_stream = viewer_stream->ctf_trace->metadata_stream;
        if (!metadata_stream) {
@@ -985,11 +1009,13 @@ retry:
                                goto error;
                }
                if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       printf_verbose("get_next_index: need new streams\n");
                        ret = ask_new_streams(ctx);
                        if (ret < 0)
                                goto error;
-                       g_hash_table_foreach(ctx->session->ctf_traces,
-                                       add_traces, ctx->bt_ctx);
+                       else if (ret > 0)
+                               g_hash_table_foreach(ctx->session->ctf_traces,
+                                               add_traces, ctx->bt_ctx);
                }
                break;
        case LTTNG_VIEWER_INDEX_RETRY:
@@ -1065,7 +1091,9 @@ retry:
        ret = get_next_index(session->ctx, viewer_stream, cur_index);
        if (ret < 0) {
                pos->offset = EOF;
-               fprintf(stderr, "[error] get_next_index failed\n");
+               if (!lttng_live_should_quit()) {
+                       fprintf(stderr, "[error] get_next_index failed\n");
+               }
                return;
        }
 
@@ -1115,7 +1143,9 @@ retry:
                goto retry;
        } else if (ret < 0) {
                pos->offset = EOF;
-               fprintf(stderr, "[error] get_data_packet failed\n");
+               if (!lttng_live_should_quit()) {
+                       fprintf(stderr, "[error] get_data_packet failed\n");
+               }
                return;
        }
 
@@ -1308,13 +1338,17 @@ end:
        return;
 }
 
+/*
+ * Request new streams for a session.
+ * Returns the number of streams received or a negative value on error.
+ */
 int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_new_streams_request rq;
        struct lttng_viewer_new_streams_response rp;
        struct lttng_viewer_stream stream;
-       int ret, i;
+       int ret, i, nb_streams = 0;
        ssize_t ret_len;
        uint32_t stream_count;
 
@@ -1419,9 +1453,10 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                if (ret < 0) {
                        goto error;
                }
+               nb_streams++;
 
        }
-       ret = 0;
+       ret = nb_streams;
 end:
        return ret;
 
@@ -1515,6 +1550,10 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
                begin_pos.type = BT_SEEK_BEGIN;
                iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
                if (!iter) {
+                       if (lttng_live_should_quit()) {
+                               ret = 0;
+                               goto end;
+                       }
                        fprintf(stderr, "[error] Iterator creation error\n");
                        goto end;
                }
@@ -1551,5 +1590,8 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
 end_free:
        bt_context_put(ctx->bt_ctx);
 end:
+       if (lttng_live_should_quit()) {
+               ret = 0;
+       }
        return ret;
 }
This page took 0.027319 seconds and 4 git commands to generate.