X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=formats%2Flttng-live%2Flttng-live-comm.c;h=70a656e72ea941c0e4faf45544aca98bcfb7908c;hb=42423b0a691fb4b2efbb5a45eb7eefa5f350ec30;hp=f9c87221b9373b7911851d43adf80da7c1364fca;hpb=f8612f315230f2352982f4391e37f5b840af8b94;p=babeltrace.git diff --git a/formats/lttng-live/lttng-live-comm.c b/formats/lttng-live/lttng-live-comm.c index f9c87221..70a656e7 100644 --- a/formats/lttng-live/lttng-live-comm.c +++ b/formats/lttng-live/lttng-live-comm.c @@ -66,6 +66,8 @@ static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence); static void add_traces(gpointer key, gpointer value, gpointer user_data); static int del_traces(gpointer key, gpointer value, gpointer user_data); +static int get_new_metadata(struct lttng_live_ctx *ctx, + struct lttng_live_viewer_stream *viewer_stream); int lttng_live_connect_viewer(struct lttng_live_ctx *ctx) { @@ -123,7 +125,7 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx) ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending cmd\n"); + perror("[error] Error sending cmd"); ret = ret_len; goto error; } @@ -133,7 +135,7 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx) ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending version\n"); + perror("[error] Error sending version"); ret = ret_len; goto error; } @@ -148,7 +150,7 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx) goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving version\n"); + perror("[error] Error receiving version"); ret = ret_len; goto error; } @@ -250,7 +252,7 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending cmd\n"); + perror("[error] Error sending cmd"); ret = ret_len; goto error; } @@ -265,7 +267,7 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving session list\n"); + perror("[error] Error receiving session list"); ret = ret_len; goto error; } @@ -282,7 +284,7 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving session\n"); + perror("[error] Error receiving session"); ret = ret_len; goto error; } @@ -369,7 +371,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending cmd\n"); + perror("[error] Error sending cmd"); ret = ret_len; goto error; } @@ -379,7 +381,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending attach request\n"); + perror("[error] Error sending attach request"); ret = ret_len; goto error; } @@ -394,7 +396,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving attach response\n"); + perror("[error] Error receiving attach response"); ret = ret_len; goto error; } @@ -407,7 +409,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) ret = -LTTNG_VIEWER_ATTACH_UNK; goto end; case LTTNG_VIEWER_ATTACH_ALREADY: - fprintf(stderr, "[error] Already a viewer attached\n"); + fprintf(stderr, "[error] There is already a viewer attached to this session\n"); ret = -1; goto end; case LTTNG_VIEWER_ATTACH_NOT_LIVE: @@ -453,7 +455,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving stream\n"); + perror("[error] Error receiving stream"); ret = ret_len; goto error; } @@ -579,7 +581,7 @@ int get_data_packet(struct lttng_live_ctx *ctx, ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending cmd\n"); + perror("[error] Error sending cmd"); ret = ret_len; goto error; } @@ -589,7 +591,7 @@ int get_data_packet(struct lttng_live_ctx *ctx, ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending get_data_packet request\n"); + perror("[error] Error sending get_data_packet request"); ret = ret_len; goto error; } @@ -604,7 +606,7 @@ int get_data_packet(struct lttng_live_ctx *ctx, goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving data response\n"); + perror("[error] Error receiving data response"); ret = ret_len; goto error; } @@ -631,6 +633,10 @@ int get_data_packet(struct lttng_live_ctx *ctx, case LTTNG_VIEWER_GET_PACKET_ERR: if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { printf_verbose("get_data_packet: new metadata needed\n"); + ret = get_new_metadata(ctx, stream); + if (ret < 0) { + goto error; + } ret = 0; goto end; } @@ -667,8 +673,7 @@ int get_data_packet(struct lttng_live_ctx *ctx, /* unmap old base */ ret = munmap_align(pos->base_mma); if (ret) { - fprintf(stderr, "[error] Unable to unmap old base: %s.\n", - strerror(errno)); + perror("[error] Unable to unmap old base"); ret = -1; goto error; } @@ -678,8 +683,7 @@ int get_data_packet(struct lttng_live_ctx *ctx, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if (pos->base_mma == MAP_FAILED) { - fprintf(stderr, "[error] mmap error %s.\n", - strerror(errno)); + perror("[error] mmap error"); pos->base_mma = NULL; ret = -1; goto error; @@ -701,7 +705,7 @@ int get_data_packet(struct lttng_live_ctx *ctx, goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving trace packet\n"); + perror("[error] Error receiving trace packet"); ret = ret_len; goto error; } @@ -712,35 +716,28 @@ error: return ret; } -/* - * Return number of metadata bytes written or a negative value on error. - */ static -int get_new_metadata(struct lttng_live_ctx *ctx, - struct lttng_live_viewer_stream *viewer_stream, - uint64_t *metadata_len) +int get_one_metadata_packet(struct lttng_live_ctx *ctx, + struct lttng_live_viewer_stream *metadata_stream) { uint64_t len = 0; int ret; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_metadata rq; struct lttng_viewer_metadata_packet rp; - struct lttng_live_viewer_stream *metadata_stream; char *data = NULL; ssize_t ret_len; + rq.stream_id = htobe64(metadata_stream->id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = sizeof(rq); cmd.cmd_version = 0; - metadata_stream = viewer_stream->ctf_trace->metadata_stream; - rq.stream_id = htobe64(metadata_stream->id); - do { ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending cmd\n"); + perror("[error] Error sending cmd"); ret = ret_len; goto error; } @@ -750,7 +747,7 @@ int get_new_metadata(struct lttng_live_ctx *ctx, ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending get_metadata request\n"); + perror("[error] Error sending get_metadata request"); ret = ret_len; goto error; } @@ -765,7 +762,7 @@ int get_new_metadata(struct lttng_live_ctx *ctx, goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving metadata response\n"); + perror("[error] Error receiving metadata response"); ret = ret_len; goto error; } @@ -777,7 +774,7 @@ int get_new_metadata(struct lttng_live_ctx *ctx, break; case LTTNG_VIEWER_NO_NEW_METADATA: printf_verbose("get_metadata : NO NEW\n"); - ret = -1; + ret = 0; goto end; case LTTNG_VIEWER_METADATA_ERR: printf_verbose("get_metadata : ERR\n"); @@ -812,7 +809,7 @@ int get_new_metadata(struct lttng_live_ctx *ctx, goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving trace packet\n"); + perror("[error] Error receiving trace packet"); ret = ret_len; free(data); goto error; @@ -828,16 +825,43 @@ int get_new_metadata(struct lttng_live_ctx *ctx, goto error; } assert(ret_len == len); + ret = len; free(data); - *metadata_len = len; - ret = 0; end: error: return ret; } +/* + * Return 0 on success, a negative value on error. + */ +static +int get_new_metadata(struct lttng_live_ctx *ctx, + struct lttng_live_viewer_stream *viewer_stream) +{ + int ret = 0; + struct lttng_live_viewer_stream *metadata_stream; + + metadata_stream = viewer_stream->ctf_trace->metadata_stream; + + do { + /* + * get_one_metadata_packet returns the number of bytes + * received, 0 when we have received everything, a + * negative value on error. + */ + ret = get_one_metadata_packet(ctx, metadata_stream); + } while (ret > 0); + + fclose(metadata_stream->metadata_fp_write); + metadata_stream->metadata_fp_write = NULL; + +error: + return ret; +} + /* * Get one index for a stream. * @@ -852,7 +876,6 @@ int get_next_index(struct lttng_live_ctx *ctx, struct lttng_viewer_get_next_index rq; struct lttng_viewer_index rp; int ret; - uint64_t metadata_len; ssize_t ret_len; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); @@ -867,7 +890,7 @@ retry: ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending cmd\n"); + perror("[error] Error sending cmd"); ret = ret_len; goto error; } @@ -877,7 +900,7 @@ retry: ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending get_next_index request\n"); + perror("[error] Error sending get_next_index request"); ret = ret_len; goto error; } @@ -892,7 +915,7 @@ retry: goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving index response\n"); + perror("[error] Error receiving index response"); ret = ret_len; goto error; } @@ -918,8 +941,7 @@ retry: if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { printf_verbose("get_next_index: new metadata needed\n"); - ret = get_new_metadata(ctx, viewer_stream, - &metadata_len); + ret = get_new_metadata(ctx, viewer_stream); if (ret < 0) { goto error; } @@ -1108,7 +1130,7 @@ int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending cmd\n"); + perror("[error] Error sending cmd"); ret = ret_len; goto error; } @@ -1123,7 +1145,7 @@ int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving create session reply\n"); + perror("[error] Error receiving create session reply"); ret = ret_len; goto error; } @@ -1158,14 +1180,15 @@ int del_traces(gpointer key, gpointer value, gpointer user_data) static void add_traces(gpointer key, gpointer value, gpointer user_data) { - int i, ret, total_metadata = 0; - uint64_t metadata_len; + int i, ret; struct bt_context *bt_ctx = user_data; struct lttng_live_ctf_trace *trace = value; struct lttng_live_viewer_stream *stream; struct bt_mmap_stream *new_mmap_stream; struct bt_mmap_stream_list mmap_list; struct lttng_live_ctx *ctx = NULL; + struct bt_trace_descriptor *td; + struct bt_trace_handle *handle; /* * We don't know how many streams we will receive for a trace, so @@ -1192,13 +1215,9 @@ void add_traces(gpointer key, gpointer value, gpointer user_data) bt_list_add(&new_mmap_stream->list, &mmap_list.head); } else { /* Get all possible metadata before starting */ - do { - ret = get_new_metadata(ctx, stream, - &metadata_len); - if (ret == 0) { - total_metadata += metadata_len; - } - } while (ret == 0 || total_metadata == 0); + ret = get_new_metadata(ctx, stream); + if (ret) + goto end_free; trace->metadata_fp = fopen(stream->path, "r"); } } @@ -1214,15 +1233,12 @@ void add_traces(gpointer key, gpointer value, gpointer user_data) fprintf(stderr, "[error] Error adding trace\n"); goto end_free; } - + handle = (struct bt_trace_handle *) g_hash_table_lookup( + bt_ctx->trace_handles, + (gpointer) (unsigned long) ret); + td = handle->td; + trace->handle = handle; if (bt_ctx->current_iterator) { - struct bt_trace_descriptor *td; - struct bt_trace_handle *handle; - - handle = (struct bt_trace_handle *) g_hash_table_lookup( - bt_ctx->trace_handles, - (gpointer) (unsigned long) ret); - td = handle->td; bt_iter_add_trace(bt_ctx->current_iterator, td); } @@ -1258,7 +1274,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending cmd\n"); + perror("[error] Error sending cmd"); ret = ret_len; goto error; } @@ -1268,7 +1284,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0); } while (ret_len < 0 && errno == EINTR); if (ret_len < 0) { - fprintf(stderr, "[error] Error sending get_new_streams request\n"); + perror("[error] Error sending get_new_streams request"); ret = ret_len; goto error; } @@ -1283,7 +1299,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving get_new_streams response\n"); + perror("[error] Error receiving get_new_streams response"); ret = ret_len; goto error; } @@ -1334,7 +1350,7 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) goto error; } if (ret_len < 0) { - fprintf(stderr, "[error] Error receiving stream\n"); + perror("[error] Error receiving stream"); ret = ret_len; goto error; }