X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=formats%2Flttng-live%2Flttng-live-functions.c;h=60f5fb51f346b6b8a64184a1e01a1ae2ca03ade1;hp=44c71cbc37304d025125d2d7de0c7b6f66ad9b24;hb=b5a1fa45f7cfaccf73b21712047775ab8f957bb0;hpb=2acdc547504407b8601937b9d89df8f56662e55d diff --git a/formats/lttng-live/lttng-live-functions.c b/formats/lttng-live/lttng-live-functions.c index 44c71cbc..60f5fb51 100644 --- a/formats/lttng-live/lttng-live-functions.c +++ b/formats/lttng-live/lttng-live-functions.c @@ -62,19 +62,18 @@ ((type) (a) > (type) (b) ? (type) (a) : (type) (b)) #endif -void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, - int whence); +static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, + size_t index, int whence); static void add_traces(gpointer key, gpointer value, gpointer user_data); static int del_traces(gpointer key, gpointer value, gpointer user_data); -int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname, - int port) +int lttng_live_connect_viewer(struct lttng_live_ctx *ctx) { struct hostent *host; struct sockaddr_in server_addr; int ret; - host = gethostbyname(hostname); + host = gethostbyname(ctx->relay_hostname); if (!host) { ret = -1; goto end; @@ -87,7 +86,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname, } server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(port); + server_addr.sin_port = htons(ctx->port); server_addr.sin_addr = *((struct in_addr *) host->h_addr); bzero(&(server_addr.sin_zero), 8); @@ -161,14 +160,82 @@ error: return ret; } +static +void free_session_list(GPtrArray *session_list) +{ + int i; + struct lttng_live_relay_session *relay_session; + + for (i = 0; i < session_list->len; i++) { + relay_session = g_ptr_array_index(session_list, i); + free(relay_session->name); + free(relay_session->hostname); + } + g_ptr_array_free(session_list, TRUE); +} + +static +void print_session_list(GPtrArray *session_list, const char *path) +{ + int i; + struct lttng_live_relay_session *relay_session; + + for (i = 0; i < session_list->len; i++) { + relay_session = g_ptr_array_index(session_list, i); + fprintf(stdout, "%s/host/%s/%s (timer = %u, " + "%u stream(s), %u client(s) connected)\n", + path, relay_session->hostname, + relay_session->name, relay_session->timer, + relay_session->streams, relay_session->clients); + } +} + +static +void update_session_list(GPtrArray *session_list, char *hostname, + char *session_name, uint32_t streams, uint32_t clients, + uint32_t timer) +{ + int i, found = 0; + struct lttng_live_relay_session *relay_session; + + for (i = 0; i < session_list->len; i++) { + relay_session = g_ptr_array_index(session_list, i); + if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) && + strncmp(relay_session->name, + session_name, NAME_MAX) == 0) { + relay_session->streams += streams; + if (relay_session->clients < clients) + relay_session->clients = clients; + found = 1; + break; + } + } + if (found) + return; + + relay_session = g_new0(struct lttng_live_relay_session, 1); + relay_session->hostname = strndup(hostname, NAME_MAX); + relay_session->name = strndup(session_name, NAME_MAX); + relay_session->clients = clients; + relay_session->streams = streams; + relay_session->timer = timer; + g_ptr_array_add(session_list, relay_session); +} + int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) { struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; struct lttng_viewer_session lsession; - int i, ret; + int i, ret, sessions_count, print_list = 0; ssize_t ret_len; - int sessions_count; + uint64_t session_id; + GPtrArray *session_list = NULL; + + if (strlen(ctx->session_name) == 0) { + print_list = 1; + session_list = g_ptr_array_new(); + } cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = 0; @@ -195,8 +262,6 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) assert(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); - fprintf(stdout, "%u active session(s)%c\n", sessions_count, - sessions_count > 0 ? ':' : ' '); for (i = 0; i < sessions_count; i++) { do { ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0); @@ -209,14 +274,30 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) assert(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; + session_id = be64toh(lsession.id); + + if (print_list) { + update_session_list(session_list, + lsession.hostname, + lsession.session_name, + be32toh(lsession.streams), + be32toh(lsession.clients), + be32toh(lsession.live_timer)); + } else { + if ((strncmp(lsession.session_name, ctx->session_name, + NAME_MAX) == 0) && (strncmp(lsession.hostname, + ctx->traced_hostname, NAME_MAX) == 0)) { + printf_verbose("Reading from session %" PRIu64 "\n", + session_id); + g_array_append_val(ctx->session_ids, + session_id); + } + } + } - fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, " - "%u stream(s), %u client(s) connected)\n", - path, be64toh(lsession.id), - lsession.session_name, lsession.hostname, - be32toh(lsession.live_timer), - be32toh(lsession.streams), - be32toh(lsession.clients)); + if (print_list) { + print_session_list(session_list, path); + free_session_list(session_list); } ret = 0; @@ -425,6 +506,11 @@ restart: if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) { printf_verbose("Session %" PRIu64 " closed\n", id); + /* + * The streams have already been closed during + * the reading, so we only need to get rid of + * the trace in our internal table of sessions. + */ g_array_remove_index(ctx->session_ids, i); /* * We can't continue iterating on the g_array @@ -822,6 +908,7 @@ error: return ret; } +static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence) { @@ -1024,6 +1111,15 @@ void add_traces(gpointer key, gpointer value, gpointer user_data) struct bt_mmap_stream_list mmap_list; struct lttng_live_ctx *ctx = NULL; + /* + * We don't know how many streams we will receive for a trace, so + * once we are done receiving the traces, we add all the traces + * received to the bt_context. + * We can receive streams during the attach command or the + * get_new_streams, so we have to make sure not to add multiple + * times the same traces. + * If a trace is already in the context, we just skip this function. + */ if (trace->in_use) return; @@ -1071,7 +1167,6 @@ void add_traces(gpointer key, gpointer value, gpointer user_data) bt_ctx->trace_handles, (gpointer) (unsigned long) ret); td = handle->td; - fprintf(stderr, "Handle : %d, td : %p\n", ret, td); bt_iter_add_trace(bt_ctx->current_iterator, td); } @@ -1094,6 +1189,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) struct lttng_viewer_stream stream; int ret, i; ssize_t ret_len; + uint32_t stream_count; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); cmd.data_size = sizeof(rq); @@ -1116,7 +1212,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending attach request\n"); + fprintf(stderr, "[error] Error sending get_new_streams request\n"); ret = ret_len; goto error; } @@ -1152,7 +1248,8 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) goto end; } - ctx->session->stream_count += be32toh(rp.streams_count); + stream_count = be32toh(rp.streams_count); + ctx->session->stream_count += stream_count; /* * When the session is created but not started, we do an active wait * until it starts. It allows the viewer to start processing the trace @@ -1166,7 +1263,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) ctx->session->stream_count); ctx->session->streams = g_new0(struct lttng_live_viewer_stream, ctx->session->stream_count); - for (i = 0; i < be32toh(rp.streams_count); i++) { + for (i = 0; i < stream_count; i++) { do { ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0); } while (ret_len < 0 && errno == EINTR);