get_new_streams and multi-session
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 5 Feb 2014 23:34:27 +0000 (18:34 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 8 Feb 2014 15:39:01 +0000 (10:39 -0500)
We now support the get_new_stream lttng live command. This command
allows us to retrieve new streams added while tracing.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
formats/lttng-live/lttng-live-functions.c
formats/lttng-live/lttng-live-functions.h
formats/lttng-live/lttng-live.c
formats/lttng-live/lttng-viewer-abi.h

index 0974b567a657c50314f10d98a477ebc96179a677..44c71cbc37304d025125d2d7de0c7b6f66ad9b24 100644 (file)
        ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
 #endif
 
+void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
+               int whence);
+static void add_traces(gpointer key, gpointer value, gpointer user_data);
+static int del_traces(gpointer key, gpointer value, gpointer user_data);
+
 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
                int port)
 {
@@ -323,7 +328,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                goto end;
        }
 
-       ctx->session->stream_count = be32toh(rp.streams_count);
+       ctx->session->stream_count += be32toh(rp.streams_count);
        /*
         * When the session is created but not started, we do an active wait
         * until it starts. It allows the viewer to start processing the trace
@@ -404,6 +409,39 @@ error:
        return ret;
 }
 
+static
+int ask_new_streams(struct lttng_live_ctx *ctx)
+{
+       int i, ret = 0;
+       uint64_t id;
+
+restart:
+       for (i = 0; i < ctx->session_ids->len; i++) {
+               id = g_array_index(ctx->session_ids, uint64_t, i);
+               ret = lttng_live_get_new_streams(ctx, id);
+               printf_verbose("Asking for new streams returns %d\n",
+                               ret);
+               if (ret < 0) {
+                       if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
+                               printf_verbose("Session %" PRIu64 " closed\n",
+                                               id);
+                               g_array_remove_index(ctx->session_ids, i);
+                               /*
+                                * We can't continue iterating on the g_array
+                                * after a remove, we have to start again.
+                                */
+                               goto restart;
+                       } else {
+                               ret = -1;
+                               goto end;
+                       }
+               }
+       }
+
+end:
+       return ret;
+}
+
 static
 int get_data_packet(struct lttng_live_ctx *ctx,
                struct ctf_stream_pos *pos,
@@ -480,6 +518,13 @@ int get_data_packet(struct lttng_live_ctx *ctx,
                        ret = 0;
                        goto end;
                }
+               if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto error;
+                       g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                                       ctx->bt_ctx);
+               }
                fprintf(stderr, "[error] get_data_packet: error\n");
                ret = -1;
                goto end;
@@ -742,6 +787,13 @@ retry:
                                goto error;
                        }
                }
+               if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto error;
+                       g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                                       ctx->bt_ctx);
+               }
                break;
        case LTTNG_VIEWER_INDEX_RETRY:
                printf_verbose("get_next_index: retry\n");
@@ -752,6 +804,7 @@ retry:
                viewer_stream->id = -1ULL;
                viewer_stream->fd = -1;
                index->offset = EOF;
+               ctx->session->stream_count--;
                break;
        case LTTNG_VIEWER_INDEX_ERR:
                fprintf(stderr, "[error] get_next_index: error\n");
@@ -902,7 +955,50 @@ end:
        return;
 }
 
-static int del_traces(gpointer key, gpointer value, gpointer user_data)
+int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
+{
+       struct lttng_viewer_cmd cmd;
+       struct lttng_viewer_create_session_response resp;
+       int ret;
+       ssize_t ret_len;
+
+       cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
+       cmd.data_size = 0;
+       cmd.cmd_version = 0;
+
+       do {
+               ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending cmd\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(cmd));
+
+       do {
+               ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving create session reply\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(resp));
+
+       if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
+               fprintf(stderr, "[error] Error creating viewer session\n");
+               ret = -1;
+               goto error;
+       }
+       ret = 0;
+
+error:
+       return ret;
+}
+
+static
+int del_traces(gpointer key, gpointer value, gpointer user_data)
 {
        struct bt_context *bt_ctx = user_data;
        struct lttng_live_ctf_trace *trace = value;
@@ -916,7 +1012,8 @@ static int del_traces(gpointer key, gpointer value, gpointer user_data)
        return 1;
 }
 
-static void add_traces(gpointer key, gpointer value, gpointer user_data)
+static
+void add_traces(gpointer key, gpointer value, gpointer user_data)
 {
        int i, ret, total_metadata = 0;
        uint64_t metadata_len;
@@ -927,6 +1024,9 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data)
        struct bt_mmap_stream_list mmap_list;
        struct lttng_live_ctx *ctx = NULL;
 
+       if (trace->in_use)
+               return;
+
        BT_INIT_LIST_HEAD(&mmap_list.head);
 
        for (i = 0; i < trace->streams->len; i++) {
@@ -962,7 +1062,21 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data)
                fprintf(stderr, "[error] Error adding trace\n");
                goto end_free;
        }
+
+       if (bt_ctx->current_iterator) {
+               struct bt_trace_descriptor *td;
+               struct bt_trace_handle *handle;
+
+               handle = (struct bt_trace_handle *) g_hash_table_lookup(
+                               bt_ctx->trace_handles,
+                               (gpointer) (unsigned long) ret);
+               td = handle->td;
+               fprintf(stderr, "Handle : %d, td : %p\n", ret, td);
+               bt_iter_add_trace(bt_ctx->current_iterator, td);
+       }
+
        trace->trace_id = ret;
+       trace->in_use = 1;
 
        goto end;
 
@@ -972,17 +1086,22 @@ end:
        return;
 }
 
-int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
+int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
 {
        struct lttng_viewer_cmd cmd;
-       struct lttng_viewer_create_session_response resp;
-       int ret;
+       struct lttng_viewer_new_streams_request rq;
+       struct lttng_viewer_new_streams_response rp;
+       struct lttng_viewer_stream stream;
+       int ret, i;
        ssize_t ret_len;
 
-       cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
-       cmd.data_size = 0;
+       cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
+       cmd.data_size = sizeof(rq);
        cmd.cmd_version = 0;
 
+       memset(&rq, 0, sizeof(rq));
+       rq.session_id = htobe64(id);
+
        do {
                ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
        } while (ret_len < 0 && errno == EINTR);
@@ -994,39 +1113,139 @@ int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
        assert(ret_len == sizeof(cmd));
 
        do {
-               ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
+               ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
        } while (ret_len < 0 && errno == EINTR);
        if (ret_len < 0) {
-               fprintf(stderr, "[error] Error receiving create session reply\n");
+               fprintf(stderr, "[error] Error sending attach request\n");
                ret = ret_len;
                goto error;
        }
-       assert(ret_len == sizeof(resp));
+       assert(ret_len == sizeof(rq));
 
-       if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
-               fprintf(stderr, "[error] Error creating viewer session\n");
-               ret = -1;
+       do {
+               ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving get_new_streams response\n");
+               ret = ret_len;
                goto error;
        }
+       assert(ret_len == sizeof(rp));
+
+       switch(be32toh(rp.status)) {
+       case LTTNG_VIEWER_NEW_STREAMS_OK:
+               break;
+       case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
+               ret = 0;
+               goto end;
+       case LTTNG_VIEWER_NEW_STREAMS_HUP:
+               ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
+               goto end;
+       case LTTNG_VIEWER_NEW_STREAMS_ERR:
+               fprintf(stderr, "[error] get_new_streams error\n");
+               ret = -1;
+               goto end;
+       default:
+               fprintf(stderr, "[error] Unknown return code %u\n",
+                               be32toh(rp.status));
+               ret = -1;
+               goto end;
+       }
+
+       ctx->session->stream_count += be32toh(rp.streams_count);
+       /*
+        * When the session is created but not started, we do an active wait
+        * until it starts. It allows the viewer to start processing the trace
+        * as soon as the session starts.
+        */
+       if (ctx->session->stream_count == 0) {
+               ret = 0;
+               goto end;
+       }
+       printf_verbose("Waiting for %" PRIu64 " streams:\n",
+               ctx->session->stream_count);
+       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
+                       ctx->session->stream_count);
+       for (i = 0; i < be32toh(rp.streams_count); i++) {
+               do {
+                       ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
+               } while (ret_len < 0 && errno == EINTR);
+               if (ret_len < 0) {
+                       fprintf(stderr, "[error] Error receiving stream\n");
+                       ret = ret_len;
+                       goto error;
+               }
+               assert(ret_len == sizeof(stream));
+               stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
+               stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+
+               printf_verbose("    stream %" PRIu64 " : %s/%s\n",
+                               be64toh(stream.id), stream.path_name,
+                               stream.channel_name);
+               ctx->session->streams[i].id = be64toh(stream.id);
+               ctx->session->streams[i].session = ctx->session;
+
+               ctx->session->streams[i].first_read = 1;
+               ctx->session->streams[i].mmap_size = 0;
+
+               if (be32toh(stream.metadata_flag)) {
+                       char *path;
+
+                       path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
+                       if (!path) {
+                               perror("strdup");
+                               ret = -1;
+                               goto error;
+                       }
+                       if (!mkdtemp(path)) {
+                               perror("mkdtemp");
+                               free(path);
+                               ret = -1;
+                               goto error;
+                       }
+                       ctx->session->streams[i].metadata_flag = 1;
+                       snprintf(ctx->session->streams[i].path,
+                                       sizeof(ctx->session->streams[i].path),
+                                       "%s/%s", path,
+                                       stream.channel_name);
+                       ret = open(ctx->session->streams[i].path,
+                                       O_WRONLY | O_CREAT | O_TRUNC,
+                                       S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+                       if (ret < 0) {
+                               perror("open");
+                               free(path);
+                               goto error;
+                       }
+                       ctx->session->streams[i].fd = ret;
+                       free(path);
+               }
+               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+                               be64toh(stream.ctf_trace_id));
+               if (ret < 0) {
+                       goto error;
+               }
+
+       }
        ret = 0;
 
+end:
 error:
        return ret;
 }
 
-void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
+void lttng_live_read(struct lttng_live_ctx *ctx)
 {
-       int ret, active_session = 0;
-       struct bt_context *bt_ctx;
+       int ret, i;
        struct bt_ctf_iter *iter;
        const struct bt_ctf_event *event;
        struct bt_iter_pos begin_pos;
        struct bt_trace_descriptor *td_write;
        struct bt_format *fmt_write;
        struct ctf_text_stream_pos *sout;
+       uint64_t id;
 
-       bt_ctx = bt_context_create();
-       if (!bt_ctx) {
+       ctx->bt_ctx = bt_context_create();
+       if (!ctx->bt_ctx) {
                fprintf(stderr, "[error] bt_context_create allocation\n");
                goto end;
        }
@@ -1053,33 +1272,38 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
                goto end_free;
        }
 
+       for (i = 0; i < ctx->session_ids->len; i++) {
+               id = g_array_index(ctx->session_ids, uint64_t, i);
+               printf_verbose("Attaching to session %lu\n", id);
+               ret = lttng_live_attach_session(ctx, id);
+               printf_verbose("Attaching session returns %d\n", ret);
+               if (ret < 0) {
+                       if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
+                               fprintf(stderr, "[error] Unknown session ID\n");
+                       }
+                       goto end_free;
+               }
+       }
+
        /*
-        * As long as the session is active, we try to reattach to it,
-        * even if all the streams get closed.
+        * As long as the session is active, we try to get new streams.
         */
-       do {
+       for (;;) {
                int flags;
 
-               do {
-                       ret = lttng_live_attach_session(ctx, session_id);
-                       printf_verbose("Attaching session returns %d\n", ret);
-                       if (ret < 0) {
-                               if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
-                                       if (active_session)
-                                               goto end_free;
-                                       fprintf(stderr, "[error] Unknown "
-                                                       "session ID\n");
-                               }
+               while (!ctx->session->stream_count) {
+                       if (ctx->session_ids->len == 0)
                                goto end_free;
-                       } else {
-                               active_session = 1;
-                       }
-               } while (ctx->session->stream_count == 0);
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto end_free;
+               }
 
-               g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
+               g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                               ctx->bt_ctx);
 
                begin_pos.type = BT_SEEK_BEGIN;
-               iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
+               iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
                if (!iter) {
                        fprintf(stderr, "[error] Iterator creation error\n");
                        goto end;
@@ -1105,11 +1329,13 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
                        }
                }
                bt_ctf_iter_destroy(iter);
-               g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
-       } while (active_session);
+               g_hash_table_foreach_remove(ctx->session->ctf_traces,
+                               del_traces, ctx->bt_ctx);
+               ctx->session->stream_count = 0;
+       }
 
 end_free:
-       bt_context_put(bt_ctx);
+       bt_context_put(ctx->bt_ctx);
 end:
        return;
 }
index 778540d619ec81c5a39bc76866130a40fc4f5768..ce311250ee07aa4840589c85d70da5f80c466637 100644 (file)
@@ -35,6 +35,8 @@
 struct lttng_live_ctx {
        int control_sock;
        struct lttng_live_session *session;
+       GArray *session_ids;
+       struct bt_context *bt_ctx;
 };
 
 struct lttng_live_viewer_stream {
@@ -62,6 +64,7 @@ struct lttng_live_ctf_trace {
        GPtrArray *streams;
        FILE *metadata_fp;
        int trace_id;
+       int in_use;
 };
 
 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
@@ -69,6 +72,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
 int lttng_live_establish_connection(struct lttng_live_ctx *ctx);
 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path);
 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id);
-void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id);
+void lttng_live_read(struct lttng_live_ctx *ctx);
+int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id);
 
 #endif /* _LTTNG_LIVE_FUNCTIONS_H */
index b1ab7408415fb4265edc82b173da94af2f4bdb26..5932872903956c9208a720bb01cbee709428e886 100644 (file)
  * hostname parameter needs to hold NAME_MAX chars.
  */
 static int parse_url(const char *path, char *hostname, int *port,
-               uint64_t *session_id)
+               uint64_t *session_id, GArray *session_ids)
 {
-       char remain[2][NAME_MAX];
+       char remain[3][NAME_MAX];
        int ret = -1, proto, proto_offset = 0;
        size_t path_len = strlen(path);
+       char *str, *strctx;
 
        /*
         * Since sscanf API does not allow easily checking string length
@@ -79,8 +80,7 @@ static int parse_url(const char *path, char *hostname, int *port,
                        ret = sscanf(remain[0], ":%d%s", port, remain[1]);
                        /* Optional session ID with port number */
                        if (ret == 2) {
-                               ret = sscanf(remain[1], "/%" PRIu64,
-                                       session_id);
+                               ret = sscanf(remain[1], "/%s", remain[2]);
                                /* Accept 0 or 1 (optional) */
                                if (ret < 0) {
                                        goto end;
@@ -89,7 +89,7 @@ static int parse_url(const char *path, char *hostname, int *port,
                        break;
                case '/':
                        /* Optional session ID */
-                       ret = sscanf(remain[0], "/%" PRIu64, session_id);
+                       ret = sscanf(remain[0], "/%s", remain[2]);
                        /* Accept 0 or 1 (optional) */
                        if (ret < 0) {
                                goto end;
@@ -106,14 +106,31 @@ static int parse_url(const char *path, char *hostname, int *port,
        if (*port < 0)
                *port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
 
-       if (*session_id == -1ULL)
+       if (strlen(remain[2]) == 0) {
                printf_verbose("Connecting to hostname : %s, port : %d, "
                                "proto : IPv%d\n",
                                hostname, *port, proto);
-       else
-               printf_verbose("Connecting to hostname : %s, port : %d, "
-                               "session id : %" PRIu64 ", proto : IPv%d\n",
-                               hostname, *port, *session_id, proto);
+               ret = 0;
+               goto end;
+       }
+
+       printf_verbose("Connecting to hostname : %s, port : %d, "
+                       "session id(s) : %s, proto : IPv%d\n",
+                       hostname, *port, remain[2], proto);
+       str = strtok_r(remain[2], ",", &strctx);
+       do {
+               char *endptr;
+               uint64_t id;
+
+               id = strtoull(str, &endptr, 0);
+               if (*endptr != '\0' || str == endptr || errno != 0) {
+                       fprintf(stderr, "[error] parsing session id\n");
+                       ret = -1;
+                       goto end;
+               }
+               g_array_append_val(session_ids, id);
+       } while ((str = strtok_r(NULL, ",", &strctx)));
+
        ret = 0;
 
 end:
@@ -137,7 +154,9 @@ static int lttng_live_open_trace_read(const char *path)
        ctx.session->ctf_traces = g_hash_table_new(g_direct_hash,
                        g_direct_equal);
 
-       ret = parse_url(path, hostname, &port, &session_id);
+       ctx.session_ids = g_array_new(FALSE, TRUE, sizeof(uint64_t));
+
+       ret = parse_url(path, hostname, &port, &session_id, ctx.session_ids);
        if (ret < 0) {
                goto end_free;
        }
@@ -154,7 +173,7 @@ static int lttng_live_open_trace_read(const char *path)
                goto end_free;
        }
 
-       if (session_id == -1ULL) {
+       if (ctx.session_ids->len == 0) {
                printf_verbose("Listing sessions\n");
                ret = lttng_live_list_sessions(&ctx, path);
                if (ret < 0) {
@@ -162,10 +181,11 @@ static int lttng_live_open_trace_read(const char *path)
                        goto end_free;
                }
        } else {
-               lttng_live_read(&ctx, session_id);
+               lttng_live_read(&ctx);
        }
 
 end_free:
+       g_array_free(ctx.session_ids, TRUE);
        g_hash_table_destroy(ctx.session->ctf_traces);
        g_free(ctx.session);
        g_free(ctx.session->streams);
index 3a669dd82c5d49a93dc9434db8e402c958f10d9c..f9bce98d0b8dd8b8d5d6e480c993020502bf5d28 100644 (file)
@@ -97,6 +97,7 @@ enum lttng_viewer_new_streams_return_code {
        LTTNG_VIEWER_NEW_STREAMS_OK           = 1, /* If new streams are being sent. */
        LTTNG_VIEWER_NEW_STREAMS_NO_NEW       = 2, /* If no new streams are available. */
        LTTNG_VIEWER_NEW_STREAMS_ERR          = 3, /* Error. */
+       LTTNG_VIEWER_NEW_STREAMS_HUP          = 4, /* Session closed. */
 };
 
 enum lttng_viewer_create_session_return_code {
This page took 0.032455 seconds and 4 git commands to generate.