X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=formats%2Flttng-live%2Flttng-live-comm.c;h=070674aa67c13298032d6b0fc5c4411ef03085f5;hb=0dfc7b628feb554b7379584a417bc65b6a84af73;hp=960c2240129cba5115223816b2a09f1578c5dd95;hpb=23285bf35e2f0575ab79666616cec0a57584b00a;p=babeltrace.git diff --git a/formats/lttng-live/lttng-live-comm.c b/formats/lttng-live/lttng-live-comm.c index 960c2240..070674aa 100644 --- a/formats/lttng-live/lttng-live-comm.c +++ b/formats/lttng-live/lttng-live-comm.c @@ -50,6 +50,7 @@ #include #include +#include #include #include "lttng-live.h" @@ -201,6 +202,18 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx) be64toh(connect.viewer_session_id)); printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major), be32toh(connect.minor)); + + if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) { + fprintf(stderr, "[error] Incompatible lttng-relayd protocol\n"); + goto error; + } + /* Use the smallest protocol version implemented. */ + if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) { + ctx->minor = be32toh(connect.minor); + } else { + ctx->minor = LTTNG_LIVE_MINOR; + } + ctx->major = LTTNG_LIVE_MAJOR; ret = 0; end: return ret; @@ -531,19 +544,26 @@ error: return -1; } +/* + * Ask the relay for new streams. + * + * Returns the number of new streams received or a negative value on error. + */ static int ask_new_streams(struct lttng_live_ctx *ctx) { - int i, ret = 0; + int i, ret = 0, nb_streams = 0; uint64_t id; restart: for (i = 0; i < ctx->session_ids->len; i++) { id = g_array_index(ctx->session_ids, uint64_t, i); ret = lttng_live_get_new_streams(ctx, id); - printf_verbose("Asking for new streams returns %d\n", - ret); + printf_verbose("Asking for new streams returns %d\n", ret); if (ret < 0) { + if (lttng_live_should_quit()) { + goto end; + } if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) { printf_verbose("Session %" PRIu64 " closed\n", id); @@ -562,8 +582,11 @@ restart: ret = -1; goto end; } + } else { + nb_streams += ret; } } + ret = nb_streams; end: return ret; @@ -688,8 +711,9 @@ retry: ret = ask_new_streams(ctx); if (ret < 0) goto error; - g_hash_table_foreach(ctx->session->ctf_traces, - add_traces, ctx->bt_ctx); + else if (ret > 0) + g_hash_table_foreach(ctx->session->ctf_traces, + add_traces, ctx->bt_ctx); } if (rp.flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { @@ -871,7 +895,7 @@ int get_new_metadata(struct lttng_live_ctx *ctx, { int ret = 0; struct lttng_live_viewer_stream *metadata_stream; - size_t size, len_read = 0;; + size_t size, len_read = 0; metadata_stream = viewer_stream->ctf_trace->metadata_stream; if (!metadata_stream) { @@ -985,11 +1009,13 @@ retry: goto error; } if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { + printf_verbose("get_next_index: need new streams\n"); ret = ask_new_streams(ctx); if (ret < 0) goto error; - g_hash_table_foreach(ctx->session->ctf_traces, - add_traces, ctx->bt_ctx); + else if (ret > 0) + g_hash_table_foreach(ctx->session->ctf_traces, + add_traces, ctx->bt_ctx); } break; case LTTNG_VIEWER_INDEX_RETRY: @@ -1065,7 +1091,9 @@ retry: ret = get_next_index(session->ctx, viewer_stream, cur_index); if (ret < 0) { pos->offset = EOF; - fprintf(stderr, "[error] get_next_index failed\n"); + if (!lttng_live_should_quit()) { + fprintf(stderr, "[error] get_next_index failed\n"); + } return; } @@ -1115,7 +1143,9 @@ retry: goto retry; } else if (ret < 0) { pos->offset = EOF; - fprintf(stderr, "[error] get_data_packet failed\n"); + if (!lttng_live_should_quit()) { + fprintf(stderr, "[error] get_data_packet failed\n"); + } return; } @@ -1308,13 +1338,17 @@ end: return; } +/* + * Request new streams for a session. + * Returns the number of streams received or a negative value on error. + */ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) { struct lttng_viewer_cmd cmd; struct lttng_viewer_new_streams_request rq; struct lttng_viewer_new_streams_response rp; struct lttng_viewer_stream stream; - int ret, i; + int ret, i, nb_streams = 0; ssize_t ret_len; uint32_t stream_count; @@ -1419,9 +1453,10 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) if (ret < 0) { goto error; } + nb_streams++; } - ret = 0; + ret = nb_streams; end: return ret; @@ -1515,6 +1550,10 @@ int lttng_live_read(struct lttng_live_ctx *ctx) begin_pos.type = BT_SEEK_BEGIN; iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL); if (!iter) { + if (lttng_live_should_quit()) { + ret = 0; + goto end; + } fprintf(stderr, "[error] Iterator creation error\n"); goto end; } @@ -1551,5 +1590,8 @@ int lttng_live_read(struct lttng_live_ctx *ctx) end_free: bt_context_put(ctx->bt_ctx); end: + if (lttng_live_should_quit()) { + ret = 0; + } return ret; }