Attach and list by session name and hostname
[babeltrace.git] / formats / lttng-live / lttng-live-functions.c
index 44c71cbc37304d025125d2d7de0c7b6f66ad9b24..60f5fb51f346b6b8a64184a1e01a1ae2ca03ade1 100644 (file)
        ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
 #endif
 
-void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
-               int whence);
+static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
+               size_t index, int whence);
 static void add_traces(gpointer key, gpointer value, gpointer user_data);
 static int del_traces(gpointer key, gpointer value, gpointer user_data);
 
-int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
-               int port)
+int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
 {
        struct hostent *host;
        struct sockaddr_in server_addr;
        int ret;
 
-       host = gethostbyname(hostname);
+       host = gethostbyname(ctx->relay_hostname);
        if (!host) {
                ret = -1;
                goto end;
@@ -87,7 +86,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
        }
 
        server_addr.sin_family = AF_INET;
-       server_addr.sin_port = htons(port);
+       server_addr.sin_port = htons(ctx->port);
        server_addr.sin_addr = *((struct in_addr *) host->h_addr);
        bzero(&(server_addr.sin_zero), 8);
 
@@ -161,14 +160,82 @@ error:
        return ret;
 }
 
+static
+void free_session_list(GPtrArray *session_list)
+{
+       int i;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               free(relay_session->name);
+               free(relay_session->hostname);
+       }
+       g_ptr_array_free(session_list, TRUE);
+}
+
+static
+void print_session_list(GPtrArray *session_list, const char *path)
+{
+       int i;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+                               "%u stream(s), %u client(s) connected)\n",
+                               path, relay_session->hostname,
+                               relay_session->name, relay_session->timer,
+                               relay_session->streams, relay_session->clients);
+       }
+}
+
+static
+void update_session_list(GPtrArray *session_list, char *hostname,
+               char *session_name, uint32_t streams, uint32_t clients,
+               uint32_t timer)
+{
+       int i, found = 0;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+                               strncmp(relay_session->name,
+                                       session_name, NAME_MAX) == 0) {
+                       relay_session->streams += streams;
+                       if (relay_session->clients < clients)
+                               relay_session->clients = clients;
+                       found = 1;
+                       break;
+               }
+       }
+       if (found)
+               return;
+
+       relay_session = g_new0(struct lttng_live_relay_session, 1);
+       relay_session->hostname = strndup(hostname, NAME_MAX);
+       relay_session->name = strndup(session_name, NAME_MAX);
+       relay_session->clients = clients;
+       relay_session->streams = streams;
+       relay_session->timer = timer;
+       g_ptr_array_add(session_list, relay_session);
+}
+
 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        struct lttng_viewer_session lsession;
-       int i, ret;
+       int i, ret, sessions_count, print_list = 0;
        ssize_t ret_len;
-       int sessions_count;
+       uint64_t session_id;
+       GPtrArray *session_list = NULL;
+
+       if (strlen(ctx->session_name) == 0) {
+               print_list = 1;
+               session_list = g_ptr_array_new();
+       }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
        cmd.data_size = 0;
@@ -195,8 +262,6 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
        assert(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
-       fprintf(stdout, "%u active session(s)%c\n", sessions_count,
-                       sessions_count > 0 ? ':' : ' ');
        for (i = 0; i < sessions_count; i++) {
                do {
                        ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
@@ -209,14 +274,30 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
                assert(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+               session_id = be64toh(lsession.id);
+
+               if (print_list) {
+                       update_session_list(session_list,
+                                       lsession.hostname,
+                                       lsession.session_name,
+                                       be32toh(lsession.streams),
+                                       be32toh(lsession.clients),
+                                       be32toh(lsession.live_timer));
+               } else {
+                       if ((strncmp(lsession.session_name, ctx->session_name,
+                               NAME_MAX) == 0) && (strncmp(lsession.hostname,
+                                       ctx->traced_hostname, NAME_MAX) == 0)) {
+                               printf_verbose("Reading from session %" PRIu64 "\n",
+                                               session_id);
+                               g_array_append_val(ctx->session_ids,
+                                               session_id);
+                       }
+               }
+       }
 
-               fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
-                               "%u stream(s), %u client(s) connected)\n",
-                               path, be64toh(lsession.id),
-                               lsession.session_name, lsession.hostname,
-                               be32toh(lsession.live_timer),
-                               be32toh(lsession.streams),
-                               be32toh(lsession.clients));
+       if (print_list) {
+               print_session_list(session_list, path);
+               free_session_list(session_list);
        }
 
        ret = 0;
@@ -425,6 +506,11 @@ restart:
                        if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
                                printf_verbose("Session %" PRIu64 " closed\n",
                                                id);
+                               /*
+                                * The streams have already been closed during
+                                * the reading, so we only need to get rid of
+                                * the trace in our internal table of sessions.
+                                */
                                g_array_remove_index(ctx->session_ids, i);
                                /*
                                 * We can't continue iterating on the g_array
@@ -822,6 +908,7 @@ error:
        return ret;
 }
 
+static
 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
                int whence)
 {
@@ -1024,6 +1111,15 @@ void add_traces(gpointer key, gpointer value, gpointer user_data)
        struct bt_mmap_stream_list mmap_list;
        struct lttng_live_ctx *ctx = NULL;
 
+       /*
+        * We don't know how many streams we will receive for a trace, so
+        * once we are done receiving the traces, we add all the traces
+        * received to the bt_context.
+        * We can receive streams during the attach command or the
+        * get_new_streams, so we have to make sure not to add multiple
+        * times the same traces.
+        * If a trace is already in the context, we just skip this function.
+        */
        if (trace->in_use)
                return;
 
@@ -1071,7 +1167,6 @@ void add_traces(gpointer key, gpointer value, gpointer user_data)
                                bt_ctx->trace_handles,
                                (gpointer) (unsigned long) ret);
                td = handle->td;
-               fprintf(stderr, "Handle : %d, td : %p\n", ret, td);
                bt_iter_add_trace(bt_ctx->current_iterator, td);
        }
 
@@ -1094,6 +1189,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
        struct lttng_viewer_stream stream;
        int ret, i;
        ssize_t ret_len;
+       uint32_t stream_count;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
        cmd.data_size = sizeof(rq);
@@ -1116,7 +1212,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
        } while (ret_len < 0 && errno == EINTR);
        if (ret_len < 0) {
-               fprintf(stderr, "[error] Error sending attach request\n");
+               fprintf(stderr, "[error] Error sending get_new_streams request\n");
                ret = ret_len;
                goto error;
        }
@@ -1152,7 +1248,8 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                goto end;
        }
 
-       ctx->session->stream_count += be32toh(rp.streams_count);
+       stream_count = be32toh(rp.streams_count);
+       ctx->session->stream_count += stream_count;
        /*
         * When the session is created but not started, we do an active wait
         * until it starts. It allows the viewer to start processing the trace
@@ -1166,7 +1263,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
                ctx->session->stream_count);
        ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
                        ctx->session->stream_count);
-       for (i = 0; i < be32toh(rp.streams_count); i++) {
+       for (i = 0; i < stream_count; i++) {
                do {
                        ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
                } while (ret_len < 0 && errno == EINTR);
This page took 0.025811 seconds and 4 git commands to generate.