Fix: lttng-live await metadata
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include <babeltrace/ctf/ctf-index.h>
38
39 #include <babeltrace/babeltrace.h>
40 #include <babeltrace/ctf/events.h>
41 #include <babeltrace/ctf/callbacks.h>
42 #include <babeltrace/ctf/iterator.h>
43
44 /* for packet_index */
45 #include <babeltrace/ctf/types.h>
46
47 #include <babeltrace/ctf/metadata.h>
48 #include <babeltrace/ctf-text/types.h>
49 #include <babeltrace/ctf/events-internal.h>
50 #include <formats/ctf/events-private.h>
51
52 #include <babeltrace/compat/memstream.h>
53
54 #include "lttng-live.h"
55 #include "lttng-viewer-abi.h"
56
57 /*
58 * Memory allocation zeroed
59 */
60 #define zmalloc(x) calloc(1, x)
61
62 #ifndef max_t
63 #define max_t(type, a, b) \
64 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
65 #endif
66
67 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
68 size_t index, int whence);
69 static void add_traces(gpointer key, gpointer value, gpointer user_data);
70 static int del_traces(gpointer key, gpointer value, gpointer user_data);
71 static int get_new_metadata(struct lttng_live_ctx *ctx,
72 struct lttng_live_viewer_stream *viewer_stream,
73 char **metadata_buf);
74
75 static
76 ssize_t lttng_live_recv(int fd, void *buf, size_t len)
77 {
78 ssize_t ret;
79 size_t copied = 0, to_copy = len;
80
81 do {
82 ret = recv(fd, buf + copied, to_copy, 0);
83 if (ret > 0) {
84 assert(ret <= to_copy);
85 copied += ret;
86 to_copy -= ret;
87 }
88 } while ((ret > 0 && to_copy > 0)
89 || (ret < 0 && errno == EINTR));
90 if (ret > 0)
91 ret = copied;
92 /* ret = 0 means orderly shutdown, ret < 0 is error. */
93 return ret;
94 }
95
96 static
97 ssize_t lttng_live_send(int fd, const void *buf, size_t len)
98 {
99 ssize_t ret;
100
101 do {
102 ret = send(fd, buf, len, MSG_NOSIGNAL);
103 } while (ret < 0 && errno == EINTR);
104 return ret;
105 }
106
107 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
108 {
109 struct hostent *host;
110 struct sockaddr_in server_addr;
111 int ret;
112
113 host = gethostbyname(ctx->relay_hostname);
114 if (!host) {
115 ret = -1;
116 goto end;
117 }
118
119 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
120 perror("Socket");
121 ret = -1;
122 goto end;
123 }
124
125 server_addr.sin_family = AF_INET;
126 server_addr.sin_port = htons(ctx->port);
127 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
128 bzero(&(server_addr.sin_zero), 8);
129
130 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
131 sizeof(struct sockaddr)) == -1) {
132 perror("Connect");
133 ret = -1;
134 goto end;
135 }
136
137 ret = 0;
138
139 end:
140 return ret;
141 }
142
143 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
144 {
145 struct lttng_viewer_cmd cmd;
146 struct lttng_viewer_connect connect;
147 int ret;
148 ssize_t ret_len;
149
150 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
151 cmd.data_size = sizeof(connect);
152 cmd.cmd_version = 0;
153
154 connect.viewer_session_id = -1ULL; /* will be set on recv */
155 connect.major = htobe32(LTTNG_LIVE_MAJOR);
156 connect.minor = htobe32(LTTNG_LIVE_MINOR);
157 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
158
159 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
160 if (ret_len < 0) {
161 perror("[error] Error sending cmd");
162 ret = ret_len;
163 goto error;
164 }
165 assert(ret_len == sizeof(cmd));
166
167 ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect));
168 if (ret_len < 0) {
169 perror("[error] Error sending version");
170 ret = ret_len;
171 goto error;
172 }
173 assert(ret_len == sizeof(connect));
174
175 ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect));
176 if (ret_len == 0) {
177 fprintf(stderr, "[error] Remote side has closed connection\n");
178 ret = -1;
179 goto error;
180 }
181 if (ret_len < 0) {
182 perror("[error] Error receiving version");
183 ret = ret_len;
184 goto error;
185 }
186 assert(ret_len == sizeof(connect));
187
188 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
189 be64toh(connect.viewer_session_id));
190 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
191 be32toh(connect.minor));
192
193 ret = 0;
194
195 error:
196 return ret;
197 }
198
199 static
200 void free_session_list(GPtrArray *session_list)
201 {
202 int i;
203 struct lttng_live_relay_session *relay_session;
204
205 for (i = 0; i < session_list->len; i++) {
206 relay_session = g_ptr_array_index(session_list, i);
207 free(relay_session->name);
208 free(relay_session->hostname);
209 }
210 g_ptr_array_free(session_list, TRUE);
211 }
212
213 static
214 void print_session_list(GPtrArray *session_list, const char *path)
215 {
216 int i;
217 struct lttng_live_relay_session *relay_session;
218
219 for (i = 0; i < session_list->len; i++) {
220 relay_session = g_ptr_array_index(session_list, i);
221 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
222 "%u stream(s), %u client(s) connected)\n",
223 path, relay_session->hostname,
224 relay_session->name, relay_session->timer,
225 relay_session->streams, relay_session->clients);
226 }
227 }
228
229 static
230 void update_session_list(GPtrArray *session_list, char *hostname,
231 char *session_name, uint32_t streams, uint32_t clients,
232 uint32_t timer)
233 {
234 int i, found = 0;
235 struct lttng_live_relay_session *relay_session;
236
237 for (i = 0; i < session_list->len; i++) {
238 relay_session = g_ptr_array_index(session_list, i);
239 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
240 strncmp(relay_session->name,
241 session_name, NAME_MAX) == 0) {
242 relay_session->streams += streams;
243 if (relay_session->clients < clients)
244 relay_session->clients = clients;
245 found = 1;
246 break;
247 }
248 }
249 if (found)
250 return;
251
252 relay_session = g_new0(struct lttng_live_relay_session, 1);
253 relay_session->hostname = strndup(hostname, NAME_MAX);
254 relay_session->name = strndup(session_name, NAME_MAX);
255 relay_session->clients = clients;
256 relay_session->streams = streams;
257 relay_session->timer = timer;
258 g_ptr_array_add(session_list, relay_session);
259 }
260
261 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
262 {
263 struct lttng_viewer_cmd cmd;
264 struct lttng_viewer_list_sessions list;
265 struct lttng_viewer_session lsession;
266 int i, ret, sessions_count, print_list = 0;
267 ssize_t ret_len;
268 uint64_t session_id;
269 GPtrArray *session_list = NULL;
270
271 if (strlen(ctx->session_name) == 0) {
272 print_list = 1;
273 session_list = g_ptr_array_new();
274 }
275
276 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
277 cmd.data_size = 0;
278 cmd.cmd_version = 0;
279
280 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
281 if (ret_len < 0) {
282 perror("[error] Error sending cmd");
283 ret = ret_len;
284 goto error;
285 }
286 assert(ret_len == sizeof(cmd));
287
288 ret_len = lttng_live_recv(ctx->control_sock, &list, sizeof(list));
289 if (ret_len == 0) {
290 fprintf(stderr, "[error] Remote side has closed connection\n");
291 ret = -1;
292 goto error;
293 }
294 if (ret_len < 0) {
295 perror("[error] Error receiving session list");
296 ret = ret_len;
297 goto error;
298 }
299 assert(ret_len == sizeof(list));
300
301 sessions_count = be32toh(list.sessions_count);
302 for (i = 0; i < sessions_count; i++) {
303 ret_len = lttng_live_recv(ctx->control_sock, &lsession, sizeof(lsession));
304 if (ret_len == 0) {
305 fprintf(stderr, "[error] Remote side has closed connection\n");
306 ret = -1;
307 goto error;
308 }
309 if (ret_len < 0) {
310 perror("[error] Error receiving session");
311 ret = ret_len;
312 goto error;
313 }
314 assert(ret_len == sizeof(lsession));
315 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
316 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
317 session_id = be64toh(lsession.id);
318
319 if (print_list) {
320 update_session_list(session_list,
321 lsession.hostname,
322 lsession.session_name,
323 be32toh(lsession.streams),
324 be32toh(lsession.clients),
325 be32toh(lsession.live_timer));
326 } else {
327 if ((strncmp(lsession.session_name, ctx->session_name,
328 NAME_MAX) == 0) && (strncmp(lsession.hostname,
329 ctx->traced_hostname, NAME_MAX) == 0)) {
330 printf_verbose("Reading from session %" PRIu64 "\n",
331 session_id);
332 g_array_append_val(ctx->session_ids,
333 session_id);
334 }
335 }
336 }
337
338 if (print_list) {
339 print_session_list(session_list, path);
340 free_session_list(session_list);
341 }
342
343 ret = 0;
344
345 error:
346 return ret;
347 }
348
349 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
350 uint64_t ctf_trace_id)
351 {
352 struct lttng_live_ctf_trace *trace;
353 int ret = 0;
354
355 trace = g_hash_table_lookup(stream->session->ctf_traces,
356 (gpointer) ctf_trace_id);
357 if (!trace) {
358 trace = g_new0(struct lttng_live_ctf_trace, 1);
359 trace->ctf_trace_id = ctf_trace_id;
360 trace->streams = g_ptr_array_new();
361 g_hash_table_insert(stream->session->ctf_traces,
362 (gpointer) ctf_trace_id,
363 trace);
364 }
365 if (stream->metadata_flag)
366 trace->metadata_stream = stream;
367
368 stream->ctf_trace = trace;
369 g_ptr_array_add(trace->streams, stream);
370
371 return ret;
372 }
373
374 static
375 int open_metadata_fp_write(struct lttng_live_viewer_stream *stream,
376 char **metadata_buf, size_t *size)
377 {
378 int ret = 0;
379
380 stream->metadata_fp_write =
381 babeltrace_open_memstream(metadata_buf, size);
382 if (!stream->metadata_fp_write) {
383 perror("Metadata open_memstream");
384 ret = -1;
385 }
386
387 return ret;
388 }
389
390 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
391 {
392 struct lttng_viewer_cmd cmd;
393 struct lttng_viewer_attach_session_request rq;
394 struct lttng_viewer_attach_session_response rp;
395 struct lttng_viewer_stream stream;
396 int ret, i;
397 ssize_t ret_len;
398
399 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
400 cmd.data_size = sizeof(rq);
401 cmd.cmd_version = 0;
402
403 memset(&rq, 0, sizeof(rq));
404 rq.session_id = htobe64(id);
405 // TODO: add cmd line parameter to select seek beginning
406 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
407 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
408
409 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
410 if (ret_len < 0) {
411 perror("[error] Error sending cmd");
412 ret = ret_len;
413 goto error;
414 }
415 assert(ret_len == sizeof(cmd));
416
417 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
418 if (ret_len < 0) {
419 perror("[error] Error sending attach request");
420 ret = ret_len;
421 goto error;
422 }
423 assert(ret_len == sizeof(rq));
424
425 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
426 if (ret_len == 0) {
427 fprintf(stderr, "[error] Remote side has closed connection\n");
428 ret = -1;
429 goto error;
430 }
431 if (ret_len < 0) {
432 perror("[error] Error receiving attach response");
433 ret = ret_len;
434 goto error;
435 }
436 assert(ret_len == sizeof(rp));
437
438 switch(be32toh(rp.status)) {
439 case LTTNG_VIEWER_ATTACH_OK:
440 break;
441 case LTTNG_VIEWER_ATTACH_UNK:
442 ret = -LTTNG_VIEWER_ATTACH_UNK;
443 goto end;
444 case LTTNG_VIEWER_ATTACH_ALREADY:
445 fprintf(stderr, "[error] There is already a viewer attached to this session\n");
446 ret = -1;
447 goto end;
448 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
449 fprintf(stderr, "[error] Not a live session\n");
450 ret = -1;
451 goto end;
452 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
453 fprintf(stderr, "[error] Wrong seek parameter\n");
454 ret = -1;
455 goto end;
456 default:
457 fprintf(stderr, "[error] Unknown attach return code %u\n",
458 be32toh(rp.status));
459 ret = -1;
460 goto end;
461 }
462 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
463 ret = -1;
464 goto end;
465 }
466
467 ctx->session->stream_count += be32toh(rp.streams_count);
468 /*
469 * When the session is created but not started, we do an active wait
470 * until it starts. It allows the viewer to start processing the trace
471 * as soon as the session starts.
472 */
473 if (ctx->session->stream_count == 0) {
474 ret = 0;
475 goto end;
476 }
477 printf_verbose("Waiting for %" PRIu64 " streams:\n",
478 ctx->session->stream_count);
479 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
480 ctx->session->stream_count);
481 for (i = 0; i < be32toh(rp.streams_count); i++) {
482 ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
483 if (ret_len == 0) {
484 fprintf(stderr, "[error] Remote side has closed connection\n");
485 ret = -1;
486 goto error;
487 }
488 if (ret_len < 0) {
489 perror("[error] Error receiving stream");
490 ret = ret_len;
491 goto error;
492 }
493 assert(ret_len == sizeof(stream));
494 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
495 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
496
497 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
498 be64toh(stream.id), stream.path_name,
499 stream.channel_name);
500 ctx->session->streams[i].id = be64toh(stream.id);
501 ctx->session->streams[i].session = ctx->session;
502
503 ctx->session->streams[i].first_read = 1;
504 ctx->session->streams[i].mmap_size = 0;
505
506 if (be32toh(stream.metadata_flag)) {
507 ctx->session->streams[i].metadata_flag = 1;
508 }
509 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
510 be64toh(stream.ctf_trace_id));
511 if (ret < 0) {
512 goto error;
513 }
514
515 }
516 ret = 0;
517
518 end:
519 error:
520 return ret;
521 }
522
523 static
524 int ask_new_streams(struct lttng_live_ctx *ctx)
525 {
526 int i, ret = 0;
527 uint64_t id;
528
529 restart:
530 for (i = 0; i < ctx->session_ids->len; i++) {
531 id = g_array_index(ctx->session_ids, uint64_t, i);
532 ret = lttng_live_get_new_streams(ctx, id);
533 printf_verbose("Asking for new streams returns %d\n",
534 ret);
535 if (ret < 0) {
536 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
537 printf_verbose("Session %" PRIu64 " closed\n",
538 id);
539 /*
540 * The streams have already been closed during
541 * the reading, so we only need to get rid of
542 * the trace in our internal table of sessions.
543 */
544 g_array_remove_index(ctx->session_ids, i);
545 /*
546 * We can't continue iterating on the g_array
547 * after a remove, we have to start again.
548 */
549 goto restart;
550 } else {
551 ret = -1;
552 goto end;
553 }
554 }
555 }
556
557 end:
558 return ret;
559 }
560
561 static
562 int append_metadata(struct lttng_live_ctx *ctx,
563 struct lttng_live_viewer_stream *viewer_stream)
564 {
565 int ret;
566 struct lttng_live_viewer_stream *metadata;
567 char *metadata_buf = NULL;
568
569 printf_verbose("get_next_index: new metadata needed\n");
570 ret = get_new_metadata(ctx, viewer_stream, &metadata_buf);
571 if (ret < 0) {
572 free(metadata_buf);
573 goto error;
574 }
575
576 metadata = viewer_stream->ctf_trace->metadata_stream;
577 metadata->ctf_trace->metadata_fp =
578 babeltrace_fmemopen(metadata_buf,
579 metadata->metadata_len, "rb");
580 if (!metadata->ctf_trace->metadata_fp) {
581 perror("Metadata fmemopen");
582 free(metadata_buf);
583 ret = -1;
584 goto error;
585 }
586 ret = ctf_append_trace_metadata(
587 viewer_stream->ctf_trace->handle->td,
588 metadata->ctf_trace->metadata_fp);
589 if (ret != 0) {
590 fprintf(stderr, "[error] Appending metadata\n");
591 goto error;
592 }
593
594 error:
595 return ret;
596 }
597
598 static
599 int get_data_packet(struct lttng_live_ctx *ctx,
600 struct ctf_stream_pos *pos,
601 struct lttng_live_viewer_stream *stream, uint64_t offset,
602 uint64_t len)
603 {
604 struct lttng_viewer_cmd cmd;
605 struct lttng_viewer_get_packet rq;
606 struct lttng_viewer_trace_packet rp;
607 ssize_t ret_len;
608 int ret;
609
610 retry:
611 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
612 cmd.data_size = sizeof(rq);
613 cmd.cmd_version = 0;
614
615 memset(&rq, 0, sizeof(rq));
616 rq.stream_id = htobe64(stream->id);
617 /* Already in big endian. */
618 rq.offset = offset;
619 rq.len = htobe32(len);
620
621 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
622 if (ret_len < 0) {
623 perror("[error] Error sending cmd");
624 ret = ret_len;
625 goto error;
626 }
627 assert(ret_len == sizeof(cmd));
628
629 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
630 if (ret_len < 0) {
631 perror("[error] Error sending get_data_packet request");
632 ret = ret_len;
633 goto error;
634 }
635 assert(ret_len == sizeof(rq));
636
637 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
638 if (ret_len == 0) {
639 fprintf(stderr, "[error] Remote side has closed connection\n");
640 ret = -1;
641 goto error;
642 }
643 if (ret_len < 0) {
644 perror("[error] Error receiving data response");
645 ret = ret_len;
646 goto error;
647 }
648 if (ret_len != sizeof(rp)) {
649 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
650 ", received %" PRId64 "\n", ret_len,
651 sizeof(rp));
652 ret = -1;
653 goto error;
654 }
655
656 rp.flags = be32toh(rp.flags);
657
658 switch (be32toh(rp.status)) {
659 case LTTNG_VIEWER_GET_PACKET_OK:
660 len = be32toh(rp.len);
661 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
662 "\n", len);
663 break;
664 case LTTNG_VIEWER_GET_PACKET_RETRY:
665 printf_verbose("get_data_packet: retry\n");
666 ret = -1;
667 goto end;
668 case LTTNG_VIEWER_GET_PACKET_ERR:
669 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
670 printf_verbose("get_data_packet: new metadata needed\n");
671 ret = append_metadata(ctx, stream);
672 if (ret)
673 goto error;
674 goto retry;
675 }
676 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
677 ret = ask_new_streams(ctx);
678 if (ret < 0)
679 goto error;
680 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
681 ctx->bt_ctx);
682 }
683 fprintf(stderr, "[error] get_data_packet: error\n");
684 ret = -1;
685 goto end;
686 case LTTNG_VIEWER_GET_PACKET_EOF:
687 ret = -2;
688 goto error;
689 default:
690 printf_verbose("get_data_packet: unknown\n");
691 assert(0);
692 ret = -1;
693 goto end;
694 }
695
696 if (len <= 0) {
697 ret = -1;
698 goto end;
699 }
700
701 if (len > stream->mmap_size) {
702 uint64_t new_size;
703
704 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
705 if (pos->base_mma) {
706 /* unmap old base */
707 ret = munmap_align(pos->base_mma);
708 if (ret) {
709 perror("[error] Unable to unmap old base");
710 ret = -1;
711 goto error;
712 }
713 pos->base_mma = NULL;
714 }
715 pos->base_mma = mmap_align(new_size,
716 PROT_READ | PROT_WRITE,
717 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
718 if (pos->base_mma == MAP_FAILED) {
719 perror("[error] mmap error");
720 pos->base_mma = NULL;
721 ret = -1;
722 goto error;
723 }
724
725 stream->mmap_size = new_size;
726 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
727 stream->mmap_size);
728 }
729
730 ret_len = lttng_live_recv(ctx->control_sock,
731 mmap_align_addr(pos->base_mma), len);
732 if (ret_len == 0) {
733 fprintf(stderr, "[error] Remote side has closed connection\n");
734 ret = -1;
735 goto error;
736 }
737 if (ret_len < 0) {
738 perror("[error] Error receiving trace packet");
739 ret = ret_len;
740 goto error;
741 }
742 assert(ret_len == len);
743
744 end:
745 error:
746 return ret;
747 }
748
749 static
750 int get_one_metadata_packet(struct lttng_live_ctx *ctx,
751 struct lttng_live_viewer_stream *metadata_stream)
752 {
753 uint64_t len = 0;
754 int ret;
755 struct lttng_viewer_cmd cmd;
756 struct lttng_viewer_get_metadata rq;
757 struct lttng_viewer_metadata_packet rp;
758 char *data = NULL;
759 ssize_t ret_len;
760
761 rq.stream_id = htobe64(metadata_stream->id);
762 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
763 cmd.data_size = sizeof(rq);
764 cmd.cmd_version = 0;
765
766 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
767 if (ret_len < 0) {
768 perror("[error] Error sending cmd");
769 ret = ret_len;
770 goto error;
771 }
772 assert(ret_len == sizeof(cmd));
773
774 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
775 if (ret_len < 0) {
776 perror("[error] Error sending get_metadata request");
777 ret = ret_len;
778 goto error;
779 }
780 assert(ret_len == sizeof(rq));
781
782 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
783 if (ret_len == 0) {
784 fprintf(stderr, "[error] Remote side has closed connection\n");
785 ret = -1;
786 goto error;
787 }
788 if (ret_len < 0) {
789 perror("[error] Error receiving metadata response");
790 ret = ret_len;
791 goto error;
792 }
793 assert(ret_len == sizeof(rp));
794
795 switch (be32toh(rp.status)) {
796 case LTTNG_VIEWER_METADATA_OK:
797 printf_verbose("get_metadata : OK\n");
798 break;
799 case LTTNG_VIEWER_NO_NEW_METADATA:
800 printf_verbose("get_metadata : NO NEW\n");
801 ret = 0;
802 goto end;
803 case LTTNG_VIEWER_METADATA_ERR:
804 printf_verbose("get_metadata : ERR\n");
805 ret = -1;
806 goto end;
807 default:
808 printf_verbose("get_metadata : UNKNOWN\n");
809 ret = -1;
810 goto end;
811 }
812
813 len = be64toh(rp.len);
814 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
815 if (len <= 0) {
816 ret = -1;
817 goto end;
818 }
819
820 data = zmalloc(len);
821 if (!data) {
822 perror("relay data zmalloc");
823 ret = -1;
824 goto error;
825 }
826 ret_len = lttng_live_recv(ctx->control_sock, data, len);
827 if (ret_len == 0) {
828 fprintf(stderr, "[error] Remote side has closed connection\n");
829 ret = -1;
830 free(data);
831 goto error;
832 }
833 if (ret_len < 0) {
834 perror("[error] Error receiving trace packet");
835 ret = ret_len;
836 free(data);
837 goto error;
838 }
839 assert(ret_len == len);
840
841 do {
842 ret_len = fwrite(data, 1, len,
843 metadata_stream->metadata_fp_write);
844 } while (ret_len < 0 && errno == EINTR);
845 if (ret_len < 0) {
846 fprintf(stderr, "[error] Writing in the metadata fp\n");
847 free(data);
848 ret = ret_len;
849 goto error;
850 }
851 assert(ret_len == len);
852 metadata_stream->metadata_len += len;
853 ret = len;
854
855 free(data);
856
857 end:
858 error:
859 return ret;
860 }
861
862 /*
863 * Return 0 on success, a negative value on error.
864 */
865 static
866 int get_new_metadata(struct lttng_live_ctx *ctx,
867 struct lttng_live_viewer_stream *viewer_stream,
868 char **metadata_buf)
869 {
870 int ret = 0;
871 struct lttng_live_viewer_stream *metadata_stream;
872 size_t size, len_read = 0;;
873
874 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
875 if (!metadata_stream) {
876 fprintf(stderr, "[error] No metadata stream\n");
877 ret = -1;
878 goto error;
879 }
880 metadata_stream->metadata_len = 0;
881 ret = open_metadata_fp_write(metadata_stream, metadata_buf, &size);
882 if (ret < 0) {
883 goto error;
884 }
885
886 do {
887 /*
888 * get_one_metadata_packet returns the number of bytes
889 * received, 0 when we have received everything, a
890 * negative value on error.
891 */
892 ret = get_one_metadata_packet(ctx, metadata_stream);
893 if (ret > 0) {
894 len_read += ret;
895 }
896 } while (ret > 0 || !len_read);
897
898 if (fclose(metadata_stream->metadata_fp_write))
899 perror("fclose");
900 metadata_stream->metadata_fp_write = NULL;
901
902 error:
903 return ret;
904 }
905
906 /*
907 * Get one index for a stream.
908 *
909 * Returns 0 on success or a negative value on error.
910 */
911 static
912 int get_next_index(struct lttng_live_ctx *ctx,
913 struct lttng_live_viewer_stream *viewer_stream,
914 struct packet_index *index)
915 {
916 struct lttng_viewer_cmd cmd;
917 struct lttng_viewer_get_next_index rq;
918 struct lttng_viewer_index rp;
919 int ret;
920 ssize_t ret_len;
921
922 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
923 cmd.data_size = sizeof(rq);
924 cmd.cmd_version = 0;
925
926 memset(&rq, 0, sizeof(rq));
927 rq.stream_id = htobe64(viewer_stream->id);
928
929 retry:
930 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
931 if (ret_len < 0) {
932 perror("[error] Error sending cmd");
933 ret = ret_len;
934 goto error;
935 }
936 assert(ret_len == sizeof(cmd));
937
938 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
939 if (ret_len < 0) {
940 perror("[error] Error sending get_next_index request");
941 ret = ret_len;
942 goto error;
943 }
944 assert(ret_len == sizeof(rq));
945
946 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
947 if (ret_len == 0) {
948 fprintf(stderr, "[error] Remote side has closed connection\n");
949 ret = -1;
950 goto error;
951 }
952 if (ret_len < 0) {
953 perror("[error] Error receiving index response");
954 ret = ret_len;
955 goto error;
956 }
957 assert(ret_len == sizeof(rp));
958
959 rp.flags = be32toh(rp.flags);
960
961 switch (be32toh(rp.status)) {
962 case LTTNG_VIEWER_INDEX_INACTIVE:
963 printf_verbose("get_next_index: inactive\n");
964 memset(index, 0, sizeof(struct packet_index));
965 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
966 break;
967 case LTTNG_VIEWER_INDEX_OK:
968 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
969 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
970 index->offset = be64toh(rp.offset);
971 index->packet_size = be64toh(rp.packet_size);
972 index->content_size = be64toh(rp.content_size);
973 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
974 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
975 index->events_discarded = be64toh(rp.events_discarded);
976
977 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
978 ret = append_metadata(ctx, viewer_stream);
979 if (ret)
980 goto error;
981 }
982 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
983 ret = ask_new_streams(ctx);
984 if (ret < 0)
985 goto error;
986 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
987 ctx->bt_ctx);
988 }
989 break;
990 case LTTNG_VIEWER_INDEX_RETRY:
991 printf_verbose("get_next_index: retry\n");
992 sleep(1);
993 goto retry;
994 case LTTNG_VIEWER_INDEX_HUP:
995 printf_verbose("get_next_index: stream hung up\n");
996 viewer_stream->id = -1ULL;
997 index->offset = EOF;
998 ctx->session->stream_count--;
999 break;
1000 case LTTNG_VIEWER_INDEX_ERR:
1001 fprintf(stderr, "[error] get_next_index: error\n");
1002 ret = -1;
1003 goto error;
1004 default:
1005 fprintf(stderr, "[error] get_next_index: unkwown value\n");
1006 ret = -1;
1007 goto error;
1008 }
1009
1010 ret = 0;
1011
1012 error:
1013 return ret;
1014 }
1015
1016 static
1017 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
1018 int whence)
1019 {
1020 struct ctf_stream_pos *pos;
1021 struct ctf_file_stream *file_stream;
1022 struct packet_index *prev_index = NULL, *cur_index;
1023 struct lttng_live_viewer_stream *viewer_stream;
1024 struct lttng_live_session *session;
1025 int ret;
1026
1027 retry:
1028 pos = ctf_pos(stream_pos);
1029 file_stream = container_of(pos, struct ctf_file_stream, pos);
1030 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
1031 session = viewer_stream->session;
1032
1033 switch (pos->packet_index->len) {
1034 case 0:
1035 g_array_set_size(pos->packet_index, 1);
1036 cur_index = &g_array_index(pos->packet_index,
1037 struct packet_index, 0);
1038 break;
1039 case 1:
1040 g_array_set_size(pos->packet_index, 2);
1041 prev_index = &g_array_index(pos->packet_index,
1042 struct packet_index, 0);
1043 cur_index = &g_array_index(pos->packet_index,
1044 struct packet_index, 1);
1045 break;
1046 case 2:
1047 g_array_index(pos->packet_index,
1048 struct packet_index, 0) =
1049 g_array_index(pos->packet_index,
1050 struct packet_index, 1);
1051 prev_index = &g_array_index(pos->packet_index,
1052 struct packet_index, 0);
1053 cur_index = &g_array_index(pos->packet_index,
1054 struct packet_index, 1);
1055 break;
1056 default:
1057 abort();
1058 break;
1059 }
1060 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
1061 ret = get_next_index(session->ctx, viewer_stream, cur_index);
1062 if (ret < 0) {
1063 pos->offset = EOF;
1064 fprintf(stderr, "[error] get_next_index failed\n");
1065 return;
1066 }
1067
1068 pos->packet_size = cur_index->packet_size;
1069 pos->content_size = cur_index->content_size;
1070 pos->mmap_base_offset = 0;
1071 if (cur_index->offset == EOF) {
1072 pos->offset = EOF;
1073 } else {
1074 pos->offset = 0;
1075 }
1076
1077 if (cur_index->content_size == 0) {
1078 file_stream->parent.cycles_timestamp =
1079 cur_index->ts_cycles.timestamp_end;
1080 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
1081 &file_stream->parent,
1082 cur_index->ts_cycles.timestamp_end);
1083 } else {
1084 /* Convert the timestamps and append to the real_index. */
1085 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1086 &file_stream->parent,
1087 cur_index->ts_cycles.timestamp_begin);
1088 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1089 &file_stream->parent,
1090 cur_index->ts_cycles.timestamp_end);
1091
1092 ctf_update_current_packet_index(&file_stream->parent,
1093 prev_index, cur_index);
1094
1095 file_stream->parent.cycles_timestamp =
1096 cur_index->ts_cycles.timestamp_begin;
1097 file_stream->parent.real_timestamp =
1098 cur_index->ts_real.timestamp_begin;
1099 }
1100
1101 if (pos->packet_size == 0 || pos->offset == EOF) {
1102 goto end;
1103 }
1104
1105 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1106 viewer_stream->id);
1107 ret = get_data_packet(session->ctx, pos, viewer_stream,
1108 be64toh(cur_index->offset),
1109 cur_index->packet_size / CHAR_BIT);
1110 if (ret == -2) {
1111 goto retry;
1112 } else if (ret < 0) {
1113 pos->offset = EOF;
1114 fprintf(stderr, "[error] get_data_packet failed\n");
1115 return;
1116 }
1117
1118 printf_verbose("Index received : packet_size : %" PRIu64
1119 ", offset %" PRIu64 ", content_size %" PRIu64
1120 ", timestamp_end : %" PRIu64 "\n",
1121 cur_index->packet_size, cur_index->offset,
1122 cur_index->content_size,
1123 cur_index->ts_cycles.timestamp_end);
1124
1125 /* update trace_packet_header and stream_packet_context */
1126 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1127 /* Read packet header */
1128 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1129 if (ret) {
1130 pos->offset = EOF;
1131 fprintf(stderr, "[error] trace packet header read failed\n");
1132 goto end;
1133 }
1134 }
1135 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1136 /* Read packet context */
1137 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1138 if (ret) {
1139 pos->offset = EOF;
1140 fprintf(stderr, "[error] stream packet context read failed\n");
1141 goto end;
1142 }
1143 }
1144 pos->data_offset = pos->offset;
1145
1146 end:
1147 return;
1148 }
1149
1150 int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1151 {
1152 struct lttng_viewer_cmd cmd;
1153 struct lttng_viewer_create_session_response resp;
1154 int ret;
1155 ssize_t ret_len;
1156
1157 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1158 cmd.data_size = 0;
1159 cmd.cmd_version = 0;
1160
1161 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
1162 if (ret_len < 0) {
1163 perror("[error] Error sending cmd");
1164 ret = ret_len;
1165 goto error;
1166 }
1167 assert(ret_len == sizeof(cmd));
1168
1169 ret_len = lttng_live_recv(ctx->control_sock, &resp, sizeof(resp));
1170 if (ret_len == 0) {
1171 fprintf(stderr, "[error] Remote side has closed connection\n");
1172 ret = -1;
1173 goto error;
1174 }
1175 if (ret_len < 0) {
1176 perror("[error] Error receiving create session reply");
1177 ret = ret_len;
1178 goto error;
1179 }
1180 assert(ret_len == sizeof(resp));
1181
1182 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1183 fprintf(stderr, "[error] Error creating viewer session\n");
1184 ret = -1;
1185 goto error;
1186 }
1187 ret = 0;
1188
1189 error:
1190 return ret;
1191 }
1192
1193 static
1194 int del_traces(gpointer key, gpointer value, gpointer user_data)
1195 {
1196 struct bt_context *bt_ctx = user_data;
1197 struct lttng_live_ctf_trace *trace = value;
1198 int ret;
1199
1200 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1201 if (ret < 0)
1202 fprintf(stderr, "[error] removing trace from context\n");
1203
1204 /* remove the key/value pair from the HT. */
1205 return 1;
1206 }
1207
1208 static
1209 void add_traces(gpointer key, gpointer value, gpointer user_data)
1210 {
1211 int i, ret;
1212 struct bt_context *bt_ctx = user_data;
1213 struct lttng_live_ctf_trace *trace = value;
1214 struct lttng_live_viewer_stream *stream;
1215 struct bt_mmap_stream *new_mmap_stream;
1216 struct bt_mmap_stream_list mmap_list;
1217 struct lttng_live_ctx *ctx = NULL;
1218 struct bt_trace_descriptor *td;
1219 struct bt_trace_handle *handle;
1220
1221 /*
1222 * We don't know how many streams we will receive for a trace, so
1223 * once we are done receiving the traces, we add all the traces
1224 * received to the bt_context.
1225 * We can receive streams during the attach command or the
1226 * get_new_streams, so we have to make sure not to add multiple
1227 * times the same traces.
1228 * If a trace is already in the context, we just skip this function.
1229 */
1230 if (trace->in_use)
1231 return;
1232
1233 BT_INIT_LIST_HEAD(&mmap_list.head);
1234
1235 for (i = 0; i < trace->streams->len; i++) {
1236 stream = g_ptr_array_index(trace->streams, i);
1237 ctx = stream->session->ctx;
1238
1239 if (!stream->metadata_flag) {
1240 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1241 new_mmap_stream->priv = (void *) stream;
1242 new_mmap_stream->fd = -1;
1243 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1244 } else {
1245 char *metadata_buf = NULL;
1246
1247 /* Get all possible metadata before starting */
1248 ret = get_new_metadata(ctx, stream, &metadata_buf);
1249 if (ret) {
1250 free(metadata_buf);
1251 goto end_free;
1252 }
1253 if (!stream->metadata_len) {
1254 fprintf(stderr, "[error] empty metadata\n");
1255 ret = -1;
1256 free(metadata_buf);
1257 goto end_free;
1258 }
1259
1260 trace->metadata_fp = babeltrace_fmemopen(metadata_buf,
1261 stream->metadata_len, "rb");
1262 if (!trace->metadata_fp) {
1263 perror("Metadata fmemopen");
1264 ret = -1;
1265 free(metadata_buf);
1266 goto end_free;
1267 }
1268 }
1269 }
1270
1271 if (!trace->metadata_fp) {
1272 fprintf(stderr, "[error] No metadata stream opened\n");
1273 goto end_free;
1274 }
1275
1276 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1277 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1278 if (ret < 0) {
1279 fprintf(stderr, "[error] Error adding trace\n");
1280 goto end_free;
1281 }
1282 trace->metadata_stream->metadata_len = 0;
1283
1284 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1285 bt_ctx->trace_handles,
1286 (gpointer) (unsigned long) ret);
1287 td = handle->td;
1288 trace->handle = handle;
1289 if (bt_ctx->current_iterator) {
1290 bt_iter_add_trace(bt_ctx->current_iterator, td);
1291 }
1292
1293 trace->trace_id = ret;
1294 trace->in_use = 1;
1295
1296 goto end;
1297
1298 end_free:
1299 bt_context_put(bt_ctx);
1300 end:
1301 return;
1302 }
1303
1304 int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
1305 {
1306 struct lttng_viewer_cmd cmd;
1307 struct lttng_viewer_new_streams_request rq;
1308 struct lttng_viewer_new_streams_response rp;
1309 struct lttng_viewer_stream stream;
1310 int ret, i;
1311 ssize_t ret_len;
1312 uint32_t stream_count;
1313
1314 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1315 cmd.data_size = sizeof(rq);
1316 cmd.cmd_version = 0;
1317
1318 memset(&rq, 0, sizeof(rq));
1319 rq.session_id = htobe64(id);
1320
1321 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
1322 if (ret_len < 0) {
1323 perror("[error] Error sending cmd");
1324 ret = ret_len;
1325 goto error;
1326 }
1327 assert(ret_len == sizeof(cmd));
1328
1329 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
1330 if (ret_len < 0) {
1331 perror("[error] Error sending get_new_streams request");
1332 ret = ret_len;
1333 goto error;
1334 }
1335 assert(ret_len == sizeof(rq));
1336
1337 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
1338 if (ret_len == 0) {
1339 fprintf(stderr, "[error] Remote side has closed connection\n");
1340 ret = -1;
1341 goto error;
1342 }
1343 if (ret_len < 0) {
1344 perror("[error] Error receiving get_new_streams response");
1345 ret = ret_len;
1346 goto error;
1347 }
1348 assert(ret_len == sizeof(rp));
1349
1350 switch(be32toh(rp.status)) {
1351 case LTTNG_VIEWER_NEW_STREAMS_OK:
1352 break;
1353 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1354 ret = 0;
1355 goto end;
1356 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1357 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1358 goto end;
1359 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1360 fprintf(stderr, "[error] get_new_streams error\n");
1361 ret = -1;
1362 goto end;
1363 default:
1364 fprintf(stderr, "[error] Unknown return code %u\n",
1365 be32toh(rp.status));
1366 ret = -1;
1367 goto end;
1368 }
1369
1370 stream_count = be32toh(rp.streams_count);
1371 ctx->session->stream_count += stream_count;
1372 /*
1373 * When the session is created but not started, we do an active wait
1374 * until it starts. It allows the viewer to start processing the trace
1375 * as soon as the session starts.
1376 */
1377 if (ctx->session->stream_count == 0) {
1378 ret = 0;
1379 goto end;
1380 }
1381 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1382 ctx->session->stream_count);
1383 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1384 ctx->session->stream_count);
1385 for (i = 0; i < stream_count; i++) {
1386 ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
1387 if (ret_len == 0) {
1388 fprintf(stderr, "[error] Remote side has closed connection\n");
1389 ret = -1;
1390 goto error;
1391 }
1392 if (ret_len < 0) {
1393 perror("[error] Error receiving stream");
1394 ret = ret_len;
1395 goto error;
1396 }
1397 assert(ret_len == sizeof(stream));
1398 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1399 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1400
1401 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1402 be64toh(stream.id), stream.path_name,
1403 stream.channel_name);
1404 ctx->session->streams[i].id = be64toh(stream.id);
1405 ctx->session->streams[i].session = ctx->session;
1406
1407 ctx->session->streams[i].first_read = 1;
1408 ctx->session->streams[i].mmap_size = 0;
1409
1410 if (be32toh(stream.metadata_flag)) {
1411 ctx->session->streams[i].metadata_flag = 1;
1412 }
1413 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1414 be64toh(stream.ctf_trace_id));
1415 if (ret < 0) {
1416 goto error;
1417 }
1418
1419 }
1420 ret = 0;
1421
1422 end:
1423 error:
1424 return ret;
1425 }
1426
1427 void lttng_live_read(struct lttng_live_ctx *ctx)
1428 {
1429 int ret, i;
1430 struct bt_ctf_iter *iter;
1431 const struct bt_ctf_event *event;
1432 struct bt_iter_pos begin_pos;
1433 struct bt_trace_descriptor *td_write;
1434 struct bt_format *fmt_write;
1435 struct ctf_text_stream_pos *sout;
1436 uint64_t id;
1437
1438 ctx->bt_ctx = bt_context_create();
1439 if (!ctx->bt_ctx) {
1440 fprintf(stderr, "[error] bt_context_create allocation\n");
1441 goto end;
1442 }
1443
1444 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1445 if (!fmt_write) {
1446 fprintf(stderr, "[error] ctf-text error\n");
1447 goto end;
1448 }
1449
1450 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1451 if (!td_write) {
1452 fprintf(stderr, "[error] Error opening output trace\n");
1453 goto end_free;
1454 }
1455
1456 sout = container_of(td_write, struct ctf_text_stream_pos,
1457 trace_descriptor);
1458 if (!sout->parent.event_cb)
1459 goto end_free;
1460
1461 ret = lttng_live_create_viewer_session(ctx);
1462 if (ret < 0) {
1463 goto end_free;
1464 }
1465
1466 for (i = 0; i < ctx->session_ids->len; i++) {
1467 id = g_array_index(ctx->session_ids, uint64_t, i);
1468 printf_verbose("Attaching to session %lu\n", id);
1469 ret = lttng_live_attach_session(ctx, id);
1470 printf_verbose("Attaching session returns %d\n", ret);
1471 if (ret < 0) {
1472 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1473 fprintf(stderr, "[error] Unknown session ID\n");
1474 }
1475 goto end_free;
1476 }
1477 }
1478
1479 /*
1480 * As long as the session is active, we try to get new streams.
1481 */
1482 for (;;) {
1483 int flags;
1484
1485 while (!ctx->session->stream_count) {
1486 if (ctx->session_ids->len == 0)
1487 goto end_free;
1488 ret = ask_new_streams(ctx);
1489 if (ret < 0)
1490 goto end_free;
1491 }
1492
1493 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1494 ctx->bt_ctx);
1495
1496 begin_pos.type = BT_SEEK_BEGIN;
1497 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
1498 if (!iter) {
1499 fprintf(stderr, "[error] Iterator creation error\n");
1500 goto end;
1501 }
1502 for (;;) {
1503 event = bt_ctf_iter_read_event_flags(iter, &flags);
1504 if (!(flags & BT_ITER_FLAG_RETRY)) {
1505 if (!event) {
1506 /* End of trace */
1507 break;
1508 }
1509 ret = sout->parent.event_cb(&sout->parent,
1510 event->parent->stream);
1511 if (ret) {
1512 fprintf(stderr, "[error] Writing "
1513 "event failed.\n");
1514 goto end_free;
1515 }
1516 }
1517 ret = bt_iter_next(bt_ctf_get_iter(iter));
1518 if (ret < 0) {
1519 goto end_free;
1520 }
1521 }
1522 bt_ctf_iter_destroy(iter);
1523 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1524 del_traces, ctx->bt_ctx);
1525 ctx->session->stream_count = 0;
1526 }
1527
1528 end_free:
1529 bt_context_put(ctx->bt_ctx);
1530 end:
1531 return;
1532 }
This page took 0.09628 seconds and 5 git commands to generate.