((type) (a) > (type) (b) ? (type) (a) : (type) (b))
#endif
-void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
- int whence);
+static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
+ size_t index, int whence);
static void add_traces(gpointer key, gpointer value, gpointer user_data);
static int del_traces(gpointer key, gpointer value, gpointer user_data);
-int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
- int port)
+int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
{
struct hostent *host;
struct sockaddr_in server_addr;
int ret;
- host = gethostbyname(hostname);
+ host = gethostbyname(ctx->relay_hostname);
if (!host) {
ret = -1;
goto end;
}
server_addr.sin_family = AF_INET;
- server_addr.sin_port = htons(port);
+ server_addr.sin_port = htons(ctx->port);
server_addr.sin_addr = *((struct in_addr *) host->h_addr);
bzero(&(server_addr.sin_zero), 8);
return ret;
}
+static
+void free_session_list(GPtrArray *session_list)
+{
+ int i;
+ struct lttng_live_relay_session *relay_session;
+
+ for (i = 0; i < session_list->len; i++) {
+ relay_session = g_ptr_array_index(session_list, i);
+ free(relay_session->name);
+ free(relay_session->hostname);
+ }
+ g_ptr_array_free(session_list, TRUE);
+}
+
+static
+void print_session_list(GPtrArray *session_list, const char *path)
+{
+ int i;
+ struct lttng_live_relay_session *relay_session;
+
+ for (i = 0; i < session_list->len; i++) {
+ relay_session = g_ptr_array_index(session_list, i);
+ fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+ "%u stream(s), %u client(s) connected)\n",
+ path, relay_session->hostname,
+ relay_session->name, relay_session->timer,
+ relay_session->streams, relay_session->clients);
+ }
+}
+
+static
+void update_session_list(GPtrArray *session_list, char *hostname,
+ char *session_name, uint32_t streams, uint32_t clients,
+ uint32_t timer)
+{
+ int i, found = 0;
+ struct lttng_live_relay_session *relay_session;
+
+ for (i = 0; i < session_list->len; i++) {
+ relay_session = g_ptr_array_index(session_list, i);
+ if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+ strncmp(relay_session->name,
+ session_name, NAME_MAX) == 0) {
+ relay_session->streams += streams;
+ if (relay_session->clients < clients)
+ relay_session->clients = clients;
+ found = 1;
+ break;
+ }
+ }
+ if (found)
+ return;
+
+ relay_session = g_new0(struct lttng_live_relay_session, 1);
+ relay_session->hostname = strndup(hostname, NAME_MAX);
+ relay_session->name = strndup(session_name, NAME_MAX);
+ relay_session->clients = clients;
+ relay_session->streams = streams;
+ relay_session->timer = timer;
+ g_ptr_array_add(session_list, relay_session);
+}
+
int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_list_sessions list;
struct lttng_viewer_session lsession;
- int i, ret;
+ int i, ret, sessions_count, print_list = 0;
ssize_t ret_len;
- int sessions_count;
+ uint64_t session_id;
+ GPtrArray *session_list = NULL;
+
+ if (strlen(ctx->session_name) == 0) {
+ print_list = 1;
+ session_list = g_ptr_array_new();
+ }
cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
cmd.data_size = 0;
assert(ret_len == sizeof(list));
sessions_count = be32toh(list.sessions_count);
- fprintf(stdout, "%u active session(s)%c\n", sessions_count,
- sessions_count > 0 ? ':' : ' ');
for (i = 0; i < sessions_count; i++) {
do {
ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
assert(ret_len == sizeof(lsession));
lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+ session_id = be64toh(lsession.id);
+
+ if (print_list) {
+ update_session_list(session_list,
+ lsession.hostname,
+ lsession.session_name,
+ be32toh(lsession.streams),
+ be32toh(lsession.clients),
+ be32toh(lsession.live_timer));
+ } else {
+ if ((strncmp(lsession.session_name, ctx->session_name,
+ NAME_MAX) == 0) && (strncmp(lsession.hostname,
+ ctx->traced_hostname, NAME_MAX) == 0)) {
+ printf_verbose("Reading from session %" PRIu64 "\n",
+ session_id);
+ g_array_append_val(ctx->session_ids,
+ session_id);
+ }
+ }
+ }
- fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
- "%u stream(s), %u client(s) connected)\n",
- path, be64toh(lsession.id),
- lsession.session_name, lsession.hostname,
- be32toh(lsession.live_timer),
- be32toh(lsession.streams),
- be32toh(lsession.clients));
+ if (print_list) {
+ print_session_list(session_list, path);
+ free_session_list(session_list);
}
ret = 0;
if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
printf_verbose("Session %" PRIu64 " closed\n",
id);
+ /*
+ * The streams have already been closed during
+ * the reading, so we only need to get rid of
+ * the trace in our internal table of sessions.
+ */
g_array_remove_index(ctx->session_ids, i);
/*
* We can't continue iterating on the g_array
return ret;
}
+static
void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
int whence)
{
struct bt_mmap_stream_list mmap_list;
struct lttng_live_ctx *ctx = NULL;
+ /*
+ * We don't know how many streams we will receive for a trace, so
+ * once we are done receiving the traces, we add all the traces
+ * received to the bt_context.
+ * We can receive streams during the attach command or the
+ * get_new_streams, so we have to make sure not to add multiple
+ * times the same traces.
+ * If a trace is already in the context, we just skip this function.
+ */
if (trace->in_use)
return;
bt_ctx->trace_handles,
(gpointer) (unsigned long) ret);
td = handle->td;
- fprintf(stderr, "Handle : %d, td : %p\n", ret, td);
bt_iter_add_trace(bt_ctx->current_iterator, td);
}
struct lttng_viewer_stream stream;
int ret, i;
ssize_t ret_len;
+ uint32_t stream_count;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
cmd.data_size = sizeof(rq);
ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
} while (ret_len < 0 && errno == EINTR);
if (ret_len < 0) {
- fprintf(stderr, "[error] Error sending attach request\n");
+ fprintf(stderr, "[error] Error sending get_new_streams request\n");
ret = ret_len;
goto error;
}
goto end;
}
- ctx->session->stream_count += be32toh(rp.streams_count);
+ stream_count = be32toh(rp.streams_count);
+ ctx->session->stream_count += stream_count;
/*
* When the session is created but not started, we do an active wait
* until it starts. It allows the viewer to start processing the trace
ctx->session->stream_count);
ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
ctx->session->stream_count);
- for (i = 0; i < be32toh(rp.streams_count); i++) {
+ for (i = 0; i < stream_count; i++) {
do {
ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
} while (ret_len < 0 && errno == EINTR);