From 2acdc547504407b8601937b9d89df8f56662e55d Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Wed, 5 Feb 2014 18:34:27 -0500 Subject: [PATCH] get_new_streams and multi-session We now support the get_new_stream lttng live command. This command allows us to retrieve new streams added while tracing. Signed-off-by: Julien Desfossez Signed-off-by: Mathieu Desnoyers --- formats/lttng-live/lttng-live-functions.c | 308 +++++++++++++++++++--- formats/lttng-live/lttng-live-functions.h | 6 +- formats/lttng-live/lttng-live.c | 46 +++- formats/lttng-live/lttng-viewer-abi.h | 1 + 4 files changed, 306 insertions(+), 55 deletions(-) diff --git a/formats/lttng-live/lttng-live-functions.c b/formats/lttng-live/lttng-live-functions.c index 0974b567..44c71cbc 100644 --- a/formats/lttng-live/lttng-live-functions.c +++ b/formats/lttng-live/lttng-live-functions.c @@ -62,6 +62,11 @@ ((type) (a) > (type) (b) ? (type) (a) : (type) (b)) #endif +void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, + int whence); +static void add_traces(gpointer key, gpointer value, gpointer user_data); +static int del_traces(gpointer key, gpointer value, gpointer user_data); + int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname, int port) { @@ -323,7 +328,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) goto end; } - ctx->session->stream_count = be32toh(rp.streams_count); + ctx->session->stream_count += be32toh(rp.streams_count); /* * When the session is created but not started, we do an active wait * until it starts. It allows the viewer to start processing the trace @@ -404,6 +409,39 @@ error: return ret; } +static +int ask_new_streams(struct lttng_live_ctx *ctx) +{ + int i, ret = 0; + uint64_t id; + +restart: + for (i = 0; i < ctx->session_ids->len; i++) { + id = g_array_index(ctx->session_ids, uint64_t, i); + ret = lttng_live_get_new_streams(ctx, id); + printf_verbose("Asking for new streams returns %d\n", + ret); + if (ret < 0) { + if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) { + printf_verbose("Session %" PRIu64 " closed\n", + id); + g_array_remove_index(ctx->session_ids, i); + /* + * We can't continue iterating on the g_array + * after a remove, we have to start again. + */ + goto restart; + } else { + ret = -1; + goto end; + } + } + } + +end: + return ret; +} + static int get_data_packet(struct lttng_live_ctx *ctx, struct ctf_stream_pos *pos, @@ -480,6 +518,13 @@ int get_data_packet(struct lttng_live_ctx *ctx, ret = 0; goto end; } + if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { + ret = ask_new_streams(ctx); + if (ret < 0) + goto error; + g_hash_table_foreach(ctx->session->ctf_traces, add_traces, + ctx->bt_ctx); + } fprintf(stderr, "[error] get_data_packet: error\n"); ret = -1; goto end; @@ -742,6 +787,13 @@ retry: goto error; } } + if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { + ret = ask_new_streams(ctx); + if (ret < 0) + goto error; + g_hash_table_foreach(ctx->session->ctf_traces, add_traces, + ctx->bt_ctx); + } break; case LTTNG_VIEWER_INDEX_RETRY: printf_verbose("get_next_index: retry\n"); @@ -752,6 +804,7 @@ retry: viewer_stream->id = -1ULL; viewer_stream->fd = -1; index->offset = EOF; + ctx->session->stream_count--; break; case LTTNG_VIEWER_INDEX_ERR: fprintf(stderr, "[error] get_next_index: error\n"); @@ -902,7 +955,50 @@ end: return; } -static int del_traces(gpointer key, gpointer value, gpointer user_data) +int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_create_session_response resp; + int ret; + ssize_t ret_len; + + cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); + cmd.data_size = 0; + cmd.cmd_version = 0; + + do { + ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error sending cmd\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(cmd)); + + do { + ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error receiving create session reply\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(resp)); + + if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { + fprintf(stderr, "[error] Error creating viewer session\n"); + ret = -1; + goto error; + } + ret = 0; + +error: + return ret; +} + +static +int del_traces(gpointer key, gpointer value, gpointer user_data) { struct bt_context *bt_ctx = user_data; struct lttng_live_ctf_trace *trace = value; @@ -916,7 +1012,8 @@ static int del_traces(gpointer key, gpointer value, gpointer user_data) return 1; } -static void add_traces(gpointer key, gpointer value, gpointer user_data) +static +void add_traces(gpointer key, gpointer value, gpointer user_data) { int i, ret, total_metadata = 0; uint64_t metadata_len; @@ -927,6 +1024,9 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data) struct bt_mmap_stream_list mmap_list; struct lttng_live_ctx *ctx = NULL; + if (trace->in_use) + return; + BT_INIT_LIST_HEAD(&mmap_list.head); for (i = 0; i < trace->streams->len; i++) { @@ -962,7 +1062,21 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data) fprintf(stderr, "[error] Error adding trace\n"); goto end_free; } + + if (bt_ctx->current_iterator) { + struct bt_trace_descriptor *td; + struct bt_trace_handle *handle; + + handle = (struct bt_trace_handle *) g_hash_table_lookup( + bt_ctx->trace_handles, + (gpointer) (unsigned long) ret); + td = handle->td; + fprintf(stderr, "Handle : %d, td : %p\n", ret, td); + bt_iter_add_trace(bt_ctx->current_iterator, td); + } + trace->trace_id = ret; + trace->in_use = 1; goto end; @@ -972,17 +1086,22 @@ end: return; } -int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) +int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) { struct lttng_viewer_cmd cmd; - struct lttng_viewer_create_session_response resp; - int ret; + struct lttng_viewer_new_streams_request rq; + struct lttng_viewer_new_streams_response rp; + struct lttng_viewer_stream stream; + int ret, i; ssize_t ret_len; - cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); - cmd.data_size = 0; + cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); + cmd.data_size = sizeof(rq); cmd.cmd_version = 0; + memset(&rq, 0, sizeof(rq)); + rq.session_id = htobe64(id); + do { ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); @@ -994,39 +1113,139 @@ int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) assert(ret_len == sizeof(cmd)); do { - ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0); + ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving create session reply\n"); + fprintf(stderr, "[error] Error sending attach request\n"); ret = ret_len; goto error; } - assert(ret_len == sizeof(resp)); + assert(ret_len == sizeof(rq)); - if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { - fprintf(stderr, "[error] Error creating viewer session\n"); - ret = -1; + do { + ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error receiving get_new_streams response\n"); + ret = ret_len; goto error; } + assert(ret_len == sizeof(rp)); + + switch(be32toh(rp.status)) { + case LTTNG_VIEWER_NEW_STREAMS_OK: + break; + case LTTNG_VIEWER_NEW_STREAMS_NO_NEW: + ret = 0; + goto end; + case LTTNG_VIEWER_NEW_STREAMS_HUP: + ret = -LTTNG_VIEWER_NEW_STREAMS_HUP; + goto end; + case LTTNG_VIEWER_NEW_STREAMS_ERR: + fprintf(stderr, "[error] get_new_streams error\n"); + ret = -1; + goto end; + default: + fprintf(stderr, "[error] Unknown return code %u\n", + be32toh(rp.status)); + ret = -1; + goto end; + } + + ctx->session->stream_count += be32toh(rp.streams_count); + /* + * When the session is created but not started, we do an active wait + * until it starts. It allows the viewer to start processing the trace + * as soon as the session starts. + */ + if (ctx->session->stream_count == 0) { + ret = 0; + goto end; + } + printf_verbose("Waiting for %" PRIu64 " streams:\n", + ctx->session->stream_count); + ctx->session->streams = g_new0(struct lttng_live_viewer_stream, + ctx->session->stream_count); + for (i = 0; i < be32toh(rp.streams_count); i++) { + do { + ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error receiving stream\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(stream)); + stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; + stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; + + printf_verbose(" stream %" PRIu64 " : %s/%s\n", + be64toh(stream.id), stream.path_name, + stream.channel_name); + ctx->session->streams[i].id = be64toh(stream.id); + ctx->session->streams[i].session = ctx->session; + + ctx->session->streams[i].first_read = 1; + ctx->session->streams[i].mmap_size = 0; + + if (be32toh(stream.metadata_flag)) { + char *path; + + path = strdup(LTTNG_METADATA_PATH_TEMPLATE); + if (!path) { + perror("strdup"); + ret = -1; + goto error; + } + if (!mkdtemp(path)) { + perror("mkdtemp"); + free(path); + ret = -1; + goto error; + } + ctx->session->streams[i].metadata_flag = 1; + snprintf(ctx->session->streams[i].path, + sizeof(ctx->session->streams[i].path), + "%s/%s", path, + stream.channel_name); + ret = open(ctx->session->streams[i].path, + O_WRONLY | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + if (ret < 0) { + perror("open"); + free(path); + goto error; + } + ctx->session->streams[i].fd = ret; + free(path); + } + ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i], + be64toh(stream.ctf_trace_id)); + if (ret < 0) { + goto error; + } + + } ret = 0; +end: error: return ret; } -void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id) +void lttng_live_read(struct lttng_live_ctx *ctx) { - int ret, active_session = 0; - struct bt_context *bt_ctx; + int ret, i; struct bt_ctf_iter *iter; const struct bt_ctf_event *event; struct bt_iter_pos begin_pos; struct bt_trace_descriptor *td_write; struct bt_format *fmt_write; struct ctf_text_stream_pos *sout; + uint64_t id; - bt_ctx = bt_context_create(); - if (!bt_ctx) { + ctx->bt_ctx = bt_context_create(); + if (!ctx->bt_ctx) { fprintf(stderr, "[error] bt_context_create allocation\n"); goto end; } @@ -1053,33 +1272,38 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id) goto end_free; } + for (i = 0; i < ctx->session_ids->len; i++) { + id = g_array_index(ctx->session_ids, uint64_t, i); + printf_verbose("Attaching to session %lu\n", id); + ret = lttng_live_attach_session(ctx, id); + printf_verbose("Attaching session returns %d\n", ret); + if (ret < 0) { + if (ret == -LTTNG_VIEWER_ATTACH_UNK) { + fprintf(stderr, "[error] Unknown session ID\n"); + } + goto end_free; + } + } + /* - * As long as the session is active, we try to reattach to it, - * even if all the streams get closed. + * As long as the session is active, we try to get new streams. */ - do { + for (;;) { int flags; - do { - ret = lttng_live_attach_session(ctx, session_id); - printf_verbose("Attaching session returns %d\n", ret); - if (ret < 0) { - if (ret == -LTTNG_VIEWER_ATTACH_UNK) { - if (active_session) - goto end_free; - fprintf(stderr, "[error] Unknown " - "session ID\n"); - } + while (!ctx->session->stream_count) { + if (ctx->session_ids->len == 0) goto end_free; - } else { - active_session = 1; - } - } while (ctx->session->stream_count == 0); + ret = ask_new_streams(ctx); + if (ret < 0) + goto end_free; + } - g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx); + g_hash_table_foreach(ctx->session->ctf_traces, add_traces, + ctx->bt_ctx); begin_pos.type = BT_SEEK_BEGIN; - iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL); + iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL); if (!iter) { fprintf(stderr, "[error] Iterator creation error\n"); goto end; @@ -1105,11 +1329,13 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id) } } bt_ctf_iter_destroy(iter); - g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx); - } while (active_session); + g_hash_table_foreach_remove(ctx->session->ctf_traces, + del_traces, ctx->bt_ctx); + ctx->session->stream_count = 0; + } end_free: - bt_context_put(bt_ctx); + bt_context_put(ctx->bt_ctx); end: return; } diff --git a/formats/lttng-live/lttng-live-functions.h b/formats/lttng-live/lttng-live-functions.h index 778540d6..ce311250 100644 --- a/formats/lttng-live/lttng-live-functions.h +++ b/formats/lttng-live/lttng-live-functions.h @@ -35,6 +35,8 @@ struct lttng_live_ctx { int control_sock; struct lttng_live_session *session; + GArray *session_ids; + struct bt_context *bt_ctx; }; struct lttng_live_viewer_stream { @@ -62,6 +64,7 @@ struct lttng_live_ctf_trace { GPtrArray *streams; FILE *metadata_fp; int trace_id; + int in_use; }; int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname, @@ -69,6 +72,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname, int lttng_live_establish_connection(struct lttng_live_ctx *ctx); int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path); int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id); -void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id); +void lttng_live_read(struct lttng_live_ctx *ctx); +int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id); #endif /* _LTTNG_LIVE_FUNCTIONS_H */ diff --git a/formats/lttng-live/lttng-live.c b/formats/lttng-live/lttng-live.c index b1ab7408..59328729 100644 --- a/formats/lttng-live/lttng-live.c +++ b/formats/lttng-live/lttng-live.c @@ -42,11 +42,12 @@ * hostname parameter needs to hold NAME_MAX chars. */ static int parse_url(const char *path, char *hostname, int *port, - uint64_t *session_id) + uint64_t *session_id, GArray *session_ids) { - char remain[2][NAME_MAX]; + char remain[3][NAME_MAX]; int ret = -1, proto, proto_offset = 0; size_t path_len = strlen(path); + char *str, *strctx; /* * Since sscanf API does not allow easily checking string length @@ -79,8 +80,7 @@ static int parse_url(const char *path, char *hostname, int *port, ret = sscanf(remain[0], ":%d%s", port, remain[1]); /* Optional session ID with port number */ if (ret == 2) { - ret = sscanf(remain[1], "/%" PRIu64, - session_id); + ret = sscanf(remain[1], "/%s", remain[2]); /* Accept 0 or 1 (optional) */ if (ret < 0) { goto end; @@ -89,7 +89,7 @@ static int parse_url(const char *path, char *hostname, int *port, break; case '/': /* Optional session ID */ - ret = sscanf(remain[0], "/%" PRIu64, session_id); + ret = sscanf(remain[0], "/%s", remain[2]); /* Accept 0 or 1 (optional) */ if (ret < 0) { goto end; @@ -106,14 +106,31 @@ static int parse_url(const char *path, char *hostname, int *port, if (*port < 0) *port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT; - if (*session_id == -1ULL) + if (strlen(remain[2]) == 0) { printf_verbose("Connecting to hostname : %s, port : %d, " "proto : IPv%d\n", hostname, *port, proto); - else - printf_verbose("Connecting to hostname : %s, port : %d, " - "session id : %" PRIu64 ", proto : IPv%d\n", - hostname, *port, *session_id, proto); + ret = 0; + goto end; + } + + printf_verbose("Connecting to hostname : %s, port : %d, " + "session id(s) : %s, proto : IPv%d\n", + hostname, *port, remain[2], proto); + str = strtok_r(remain[2], ",", &strctx); + do { + char *endptr; + uint64_t id; + + id = strtoull(str, &endptr, 0); + if (*endptr != '\0' || str == endptr || errno != 0) { + fprintf(stderr, "[error] parsing session id\n"); + ret = -1; + goto end; + } + g_array_append_val(session_ids, id); + } while ((str = strtok_r(NULL, ",", &strctx))); + ret = 0; end: @@ -137,7 +154,9 @@ static int lttng_live_open_trace_read(const char *path) ctx.session->ctf_traces = g_hash_table_new(g_direct_hash, g_direct_equal); - ret = parse_url(path, hostname, &port, &session_id); + ctx.session_ids = g_array_new(FALSE, TRUE, sizeof(uint64_t)); + + ret = parse_url(path, hostname, &port, &session_id, ctx.session_ids); if (ret < 0) { goto end_free; } @@ -154,7 +173,7 @@ static int lttng_live_open_trace_read(const char *path) goto end_free; } - if (session_id == -1ULL) { + if (ctx.session_ids->len == 0) { printf_verbose("Listing sessions\n"); ret = lttng_live_list_sessions(&ctx, path); if (ret < 0) { @@ -162,10 +181,11 @@ static int lttng_live_open_trace_read(const char *path) goto end_free; } } else { - lttng_live_read(&ctx, session_id); + lttng_live_read(&ctx); } end_free: + g_array_free(ctx.session_ids, TRUE); g_hash_table_destroy(ctx.session->ctf_traces); g_free(ctx.session); g_free(ctx.session->streams); diff --git a/formats/lttng-live/lttng-viewer-abi.h b/formats/lttng-live/lttng-viewer-abi.h index 3a669dd8..f9bce98d 100644 --- a/formats/lttng-live/lttng-viewer-abi.h +++ b/formats/lttng-live/lttng-viewer-abi.h @@ -97,6 +97,7 @@ enum lttng_viewer_new_streams_return_code { LTTNG_VIEWER_NEW_STREAMS_OK = 1, /* If new streams are being sent. */ LTTNG_VIEWER_NEW_STREAMS_NO_NEW = 2, /* If no new streams are available. */ LTTNG_VIEWER_NEW_STREAMS_ERR = 3, /* Error. */ + LTTNG_VIEWER_NEW_STREAMS_HUP = 4, /* Session closed. */ }; enum lttng_viewer_create_session_return_code { -- 2.34.1