Attach and list by session name and hostname
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 5 Feb 2014 23:34:28 +0000 (18:34 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 8 Feb 2014 15:39:01 +0000 (10:39 -0500)
From now on, the user attaches to a session by hostname and session name
(no ID anymore). With this new features, we automatically merge the
sessions with the same hostname and session name.

The new URL format to attach to a session is :
net://<relay-hostname>/host/<traced-hostname>/<session-name>

All the fields are mandatory.
No changes for the listing :
net://<relay-hostname>

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
formats/ctf/ctf.c
formats/lttng-live/lttng-live-functions.c
formats/lttng-live/lttng-live-functions.h
formats/lttng-live/lttng-live.c

index 9f807c9e2d9910dcfe0a089c2ffaf200da766f12..515bbe1a2c256aea19efefc3c1af9bfe35ac82d9 100644 (file)
@@ -2324,6 +2324,7 @@ struct bt_trace_descriptor *ctf_open_mmap_trace(
                goto error;
        }
        td = g_new0(struct ctf_trace, 1);
+       td->dirfd = -1;
        ret = ctf_open_mmap_trace_read(td, mmap_list, packet_seek, metadata_fp);
        if (ret)
                goto error_free;
index 44c71cbc37304d025125d2d7de0c7b6f66ad9b24..60f5fb51f346b6b8a64184a1e01a1ae2ca03ade1 100644 (file)
        ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
 #endif
 
-void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
-               int whence);
+static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
+               size_t index, int whence);
 static void add_traces(gpointer key, gpointer value, gpointer user_data);
 static int del_traces(gpointer key, gpointer value, gpointer user_data);
 
-int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
-               int port)
+int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
 {
        struct hostent *host;
        struct sockaddr_in server_addr;
        int ret;
 
-       host = gethostbyname(hostname);
+       host = gethostbyname(ctx->relay_hostname);
        if (!host) {
                ret = -1;
                goto end;
@@ -87,7 +86,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
        }
 
        server_addr.sin_family = AF_INET;
-       server_addr.sin_port = htons(port);
+       server_addr.sin_port = htons(ctx->port);
        server_addr.sin_addr = *((struct in_addr *) host->h_addr);
        bzero(&(server_addr.sin_zero), 8);
 
@@ -161,14 +160,82 @@ error:
        return ret;
 }
 
+static
+void free_session_list(GPtrArray *session_list)
+{
+       int i;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               free(relay_session->name);
+               free(relay_session->hostname);
+       }
+       g_ptr_array_free(session_list, TRUE);
+}
+
+static
+void print_session_list(GPtrArray *session_list, const char *path)
+{
+       int i;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+                               "%u stream(s), %u client(s) connected)\n",
+                               path, relay_session->hostname,
+                               relay_session->name, relay_session->timer,
+                               relay_session->streams, relay_session->clients);
+       }
+}
+
+static
+void update_session_list(GPtrArray *session_list, char *hostname,
+               char *session_name, uint32_t streams, uint32_t clients,
+               uint32_t timer)
+{
+       int i, found = 0;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+                               strncmp(relay_session->name,
+                                       session_name, NAME_MAX) == 0) {
+                       relay_session->streams += streams;
+                       if (relay_session->clients < clients)
+                               relay_session->clients = clients;
+                       found = 1;
+                       break;
+               }
+       }
+       if (found)
+               return;
+
+       relay_session = g_new0(struct lttng_live_relay_session, 1);
+       relay_session->hostname = strndup(hostname, NAME_MAX);
+       relay_session->name = strndup(session_name, NAME_MAX);
+       relay_session->clients = clients;
+       relay_session->streams = streams;
+       relay_session->timer = timer;
+       g_ptr_array_add(session_list, relay_session);
+}
+
 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        struct lttng_viewer_session lsession;
-       int i, ret;
+       int i, ret, sessions_count, print_list = 0;
        ssize_t ret_len;
-       int sessions_count;
+       uint64_t session_id;
+       GPtrArray *session_list = NULL;
+
+       if (strlen(ctx->session_name) == 0) {
+               print_list = 1;
+               session_list = g_ptr_array_new();
+       }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
        cmd.data_size = 0;
@@ -195,8 +262,6 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
        assert(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
-       fprintf(stdout, "%u active session(s)%c\n", sessions_count,
-                       sessions_count > 0 ? ':' : ' ');
        for (i = 0; i < sessions_count; i++) {
                do {
                        ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
@@ -209,14 +274,30 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
                assert(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+               session_id = be64toh(lsession.id);
+
+               if (print_list) {
+                       update_session_list(session_list,
+                                       lsession.hostname,
+                                       lsession.session_name,
+                                       be32toh(lsession.streams),
+                                       be32toh(lsession.clients),
+                                       be32toh(lsession.live_timer));
+               } else {
+                       if ((strncmp(lsession.session_name, ctx->session_name,
+                               NAME_MAX) == 0) && (strncmp(lsession.hostname,
+                                       ctx->traced_hostname, NAME_MAX) == 0)) {
+                               printf_verbose("Reading from session %" PRIu64 "\n",
+                                               session_id);
+                               g_array_append_val(ctx->session_ids,
+                                               session_id);
+                       }
+               }
+       }
 
-               fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
-                               "%u stream(s), %u client(s) connected)\n",
-                               path, be64toh(lsession.id),
-                               lsession.session_name, lsession.hostname,
-                               be32toh(lsession.live_timer),
-                               be32toh(lsession.streams),
-                               be32toh(lsession.clients));
+       if (print_list) {
+               print_session_list(session_list, path);
+               free_session_list(session_list);
        }
 
        ret = 0;
@@ -425,6 +506,11 @@ restart:
                        if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
                                printf_verbose("Session %" PRIu64 " closed\n",
                                                id);
+                               /*
+                                * The streams have already been closed during
+                                * the reading, so we only need to get rid of
+                                * the trace in our internal table of sessions.
+                                */
                                g_array_remove_index(ctx->session_ids, i);
                                /*
                                 * We can't continue iterating on the g_array
@@ -822,6 +908,7 @@ error:
        return ret;
 }
 
+static
 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
                int whence)
 {
@@ -1024,6 +1111,15 @@ void add_traces(gpointer key, gpointer value, gpointer user_data)
        struct bt_mmap_stream_list mmap_list;
        struct lttng_live_ctx *ctx = NULL;
 
+       /*
+        * We don't know how many streams we will receive for a trace, so
+        * once we are done receiving the traces, we add all the traces
+        * received to the bt_context.
+        * We can receive streams during the attach command or the
+        * get_new_streams, so we have to make sure not to add multiple
+        * times the same traces.
+        * If a trace is already in the context, we just skip this function.
+        */
        if (trace->in_use)
                return;
 
@@ -1071,7 +1167,6 @@ void add_traces(gpointer key, gpointer value, gpointer user_data)
                                bt_ctx->trace_handles,
                                (gpointer) (unsigned long) ret);
                td = handle->td;
-               fprintf(stderr, "Handle : %d, td : %p\n", ret, td);
                bt_iter_add_trace(bt_ctx->current_iterator, td);
        }
 
@@ -1094,6 +1189,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
        struct lttng_viewer_stream stream;
        int ret, i;
        ssize_t ret_len;
+       uint32_t stream_count;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
        cmd.data_size = sizeof(rq);
@@ -1116,7 +1212,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
        } while (ret_len < 0 && errno == EINTR);
        if (ret_len < 0) {
-               fprintf(stderr, "[error] Error sending attach request\n");
+               fprintf(stderr, "[error] Error sending get_new_streams request\n");
                ret = ret_len;
                goto error;
        }
@@ -1152,7 +1248,8 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                goto end;
        }
 
-       ctx->session->stream_count += be32toh(rp.streams_count);
+       stream_count = be32toh(rp.streams_count);
+       ctx->session->stream_count += stream_count;
        /*
         * When the session is created but not started, we do an active wait
         * until it starts. It allows the viewer to start processing the trace
@@ -1166,7 +1263,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                ctx->session->stream_count);
        ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
                        ctx->session->stream_count);
-       for (i = 0; i < be32toh(rp.streams_count); i++) {
+       for (i = 0; i < stream_count; i++) {
                do {
                        ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
                } while (ret_len < 0 && errno == EINTR);
index ce311250ee07aa4840589c85d70da5f80c466637..2638da19a9c84ae13fce96c21f43ccc72f8e672b 100644 (file)
 #define LTTNG_LIVE_MINOR                       4
 
 struct lttng_live_ctx {
+       char traced_hostname[NAME_MAX];
+       char session_name[NAME_MAX];
+       char relay_hostname[NAME_MAX];
        int control_sock;
+       int port;
        struct lttng_live_session *session;
-       GArray *session_ids;
        struct bt_context *bt_ctx;
+       GArray *session_ids;
 };
 
 struct lttng_live_viewer_stream {
@@ -67,8 +71,16 @@ struct lttng_live_ctf_trace {
        int in_use;
 };
 
-int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
-               int port);
+/* Just used in listing. */
+struct lttng_live_relay_session {
+       uint32_t streams;
+       uint32_t clients;
+       uint32_t timer;
+       char *name;
+       char *hostname;
+};
+
+int lttng_live_connect_viewer(struct lttng_live_ctx *ctx);
 int lttng_live_establish_connection(struct lttng_live_ctx *ctx);
 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path);
 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id);
index 5932872903956c9208a720bb01cbee709428e886..7b0673cb5e37cd8c7ccfd89e3242bff86b786d40 100644 (file)
 /*
  * hostname parameter needs to hold NAME_MAX chars.
  */
-static int parse_url(const char *path, char *hostname, int *port,
-               uint64_t *session_id, GArray *session_ids)
+static
+int parse_url(const char *path, struct lttng_live_ctx *ctx)
 {
        char remain[3][NAME_MAX];
        int ret = -1, proto, proto_offset = 0;
        size_t path_len = strlen(path);
-       char *str, *strctx;
 
        /*
         * Since sscanf API does not allow easily checking string length
@@ -72,12 +71,12 @@ static int parse_url(const char *path, char *hostname, int *port,
        /* TODO : parse for IPv6 as well */
        /* Parse the hostname or IP */
        ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s",
-               hostname, remain[0]);
+               ctx->relay_hostname, remain[0]);
        if (ret == 2) {
                /* Optional port number */
                switch (remain[0][0]) {
                case ':':
-                       ret = sscanf(remain[0], ":%d%s", port, remain[1]);
+                       ret = sscanf(remain[0], ":%d%s", &ctx->port, remain[1]);
                        /* Optional session ID with port number */
                        if (ret == 2) {
                                ret = sscanf(remain[1], "/%s", remain[2]);
@@ -103,34 +102,29 @@ static int parse_url(const char *path, char *hostname, int *port,
                }
        }
 
-       if (*port < 0)
-               *port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
+       if (ctx->port < 0)
+               ctx->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
 
        if (strlen(remain[2]) == 0) {
                printf_verbose("Connecting to hostname : %s, port : %d, "
                                "proto : IPv%d\n",
-                               hostname, *port, proto);
+                               ctx->relay_hostname, ctx->port, proto);
                ret = 0;
                goto end;
        }
+       ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s",
+                       ctx->traced_hostname, ctx->session_name);
+       if (ret != 2) {
+               fprintf(stderr, "[error] Format : "
+                       "net://<hostname>/host/<traced_hostname>/<session_name>\n");
+               goto end;
+       }
 
        printf_verbose("Connecting to hostname : %s, port : %d, "
-                       "session id(s) : %s, proto : IPv%d\n",
-                       hostname, *port, remain[2], proto);
-       str = strtok_r(remain[2], ",", &strctx);
-       do {
-               char *endptr;
-               uint64_t id;
-
-               id = strtoull(str, &endptr, 0);
-               if (*endptr != '\0' || str == endptr || errno != 0) {
-                       fprintf(stderr, "[error] parsing session id\n");
-                       ret = -1;
-                       goto end;
-               }
-               g_array_append_val(session_ids, id);
-       } while ((str = strtok_r(NULL, ",", &strctx)));
-
+                       "traced hostname : %s, session name : %s, "
+                       "proto : IPv%d\n",
+                       ctx->relay_hostname, ctx->port, ctx->traced_hostname,
+                       ctx->session_name, proto);
        ret = 0;
 
 end:
@@ -139,56 +133,53 @@ end:
 
 static int lttng_live_open_trace_read(const char *path)
 {
-       char hostname[NAME_MAX];
-       int port = -1;
-       uint64_t session_id = -1ULL;
        int ret = 0;
-       struct lttng_live_ctx ctx;
+       struct lttng_live_ctx *ctx;
 
-       ctx.session = g_new0(struct lttng_live_session, 1);
+       ctx = g_new0(struct lttng_live_ctx, 1);
+       ctx->session = g_new0(struct lttng_live_session, 1);
 
        /* We need a pointer to the context from the packet_seek function. */
-       ctx.session->ctx = &ctx;
+       ctx->session->ctx = ctx;
 
        /* HT to store the CTF traces. */
-       ctx.session->ctf_traces = g_hash_table_new(g_direct_hash,
+       ctx->session->ctf_traces = g_hash_table_new(g_direct_hash,
                        g_direct_equal);
+       ctx->port = -1;
+       ctx->session_ids = g_array_new(FALSE, TRUE, sizeof(uint64_t));
 
-       ctx.session_ids = g_array_new(FALSE, TRUE, sizeof(uint64_t));
-
-       ret = parse_url(path, hostname, &port, &session_id, ctx.session_ids);
+       ret = parse_url(path, ctx);
        if (ret < 0) {
                goto end_free;
        }
 
-       ret = lttng_live_connect_viewer(&ctx, hostname, port);
+       ret = lttng_live_connect_viewer(ctx);
        if (ret < 0) {
                fprintf(stderr, "[error] Connection failed\n");
                goto end_free;
        }
        printf_verbose("LTTng-live connected to relayd\n");
 
-       ret = lttng_live_establish_connection(&ctx);
+       ret = lttng_live_establish_connection(ctx);
        if (ret < 0) {
                goto end_free;
        }
 
-       if (ctx.session_ids->len == 0) {
-               printf_verbose("Listing sessions\n");
-               ret = lttng_live_list_sessions(&ctx, path);
-               if (ret < 0) {
-                       fprintf(stderr, "[error] List error\n");
-                       goto end_free;
-               }
-       } else {
-               lttng_live_read(&ctx);
+       printf_verbose("Listing sessions\n");
+       ret = lttng_live_list_sessions(ctx, path);
+       if (ret < 0) {
+               fprintf(stderr, "[error] List error\n");
+               goto end_free;
        }
 
+       if (ctx->session_ids->len > 0)
+               lttng_live_read(ctx);
+
 end_free:
-       g_array_free(ctx.session_ids, TRUE);
-       g_hash_table_destroy(ctx.session->ctf_traces);
-       g_free(ctx.session);
-       g_free(ctx.session->streams);
+       g_hash_table_destroy(ctx->session->ctf_traces);
+       g_free(ctx->session);
+       g_free(ctx->session->streams);
+       g_free(ctx);
        return ret;
 }
 
This page took 0.032164 seconds and 4 git commands to generate.