Fix: get_new_metadata receive all the metadata
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include <babeltrace/ctf/ctf-index.h>
38
39 #include <babeltrace/babeltrace.h>
40 #include <babeltrace/ctf/events.h>
41 #include <babeltrace/ctf/callbacks.h>
42 #include <babeltrace/ctf/iterator.h>
43
44 /* for packet_index */
45 #include <babeltrace/ctf/types.h>
46
47 #include <babeltrace/ctf/metadata.h>
48 #include <babeltrace/ctf-text/types.h>
49 #include <babeltrace/ctf/events-internal.h>
50 #include <formats/ctf/events-private.h>
51
52 #include "lttng-live.h"
53 #include "lttng-viewer-abi.h"
54
55 /*
56 * Memory allocation zeroed
57 */
58 #define zmalloc(x) calloc(1, x)
59
60 #ifndef max_t
61 #define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63 #endif
64
65 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
66 size_t index, int whence);
67 static void add_traces(gpointer key, gpointer value, gpointer user_data);
68 static int del_traces(gpointer key, gpointer value, gpointer user_data);
69 static int get_new_metadata(struct lttng_live_ctx *ctx,
70 struct lttng_live_viewer_stream *viewer_stream);
71
72 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
73 {
74 struct hostent *host;
75 struct sockaddr_in server_addr;
76 int ret;
77
78 host = gethostbyname(ctx->relay_hostname);
79 if (!host) {
80 ret = -1;
81 goto end;
82 }
83
84 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
85 perror("Socket");
86 ret = -1;
87 goto end;
88 }
89
90 server_addr.sin_family = AF_INET;
91 server_addr.sin_port = htons(ctx->port);
92 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
93 bzero(&(server_addr.sin_zero), 8);
94
95 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
96 sizeof(struct sockaddr)) == -1) {
97 perror("Connect");
98 ret = -1;
99 goto end;
100 }
101
102 ret = 0;
103
104 end:
105 return ret;
106 }
107
108 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
109 {
110 struct lttng_viewer_cmd cmd;
111 struct lttng_viewer_connect connect;
112 int ret;
113 ssize_t ret_len;
114
115 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
116 cmd.data_size = sizeof(connect);
117 cmd.cmd_version = 0;
118
119 connect.viewer_session_id = -1ULL; /* will be set on recv */
120 connect.major = htobe32(LTTNG_LIVE_MAJOR);
121 connect.minor = htobe32(LTTNG_LIVE_MINOR);
122 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
123
124 do {
125 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
126 } while (ret_len < 0 && errno == EINTR);
127 if (ret_len < 0) {
128 perror("[error] Error sending cmd");
129 ret = ret_len;
130 goto error;
131 }
132 assert(ret_len == sizeof(cmd));
133
134 do {
135 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
136 } while (ret_len < 0 && errno == EINTR);
137 if (ret_len < 0) {
138 perror("[error] Error sending version");
139 ret = ret_len;
140 goto error;
141 }
142 assert(ret_len == sizeof(connect));
143
144 do {
145 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
146 } while (ret_len < 0 && errno == EINTR);
147 if (ret_len == 0) {
148 fprintf(stderr, "[error] Remote side has closed connection\n");
149 ret = -1;
150 goto error;
151 }
152 if (ret_len < 0) {
153 perror("[error] Error receiving version");
154 ret = ret_len;
155 goto error;
156 }
157 assert(ret_len == sizeof(connect));
158
159 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
160 be64toh(connect.viewer_session_id));
161 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
162 be32toh(connect.minor));
163
164 ret = 0;
165
166 error:
167 return ret;
168 }
169
170 static
171 void free_session_list(GPtrArray *session_list)
172 {
173 int i;
174 struct lttng_live_relay_session *relay_session;
175
176 for (i = 0; i < session_list->len; i++) {
177 relay_session = g_ptr_array_index(session_list, i);
178 free(relay_session->name);
179 free(relay_session->hostname);
180 }
181 g_ptr_array_free(session_list, TRUE);
182 }
183
184 static
185 void print_session_list(GPtrArray *session_list, const char *path)
186 {
187 int i;
188 struct lttng_live_relay_session *relay_session;
189
190 for (i = 0; i < session_list->len; i++) {
191 relay_session = g_ptr_array_index(session_list, i);
192 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
193 "%u stream(s), %u client(s) connected)\n",
194 path, relay_session->hostname,
195 relay_session->name, relay_session->timer,
196 relay_session->streams, relay_session->clients);
197 }
198 }
199
200 static
201 void update_session_list(GPtrArray *session_list, char *hostname,
202 char *session_name, uint32_t streams, uint32_t clients,
203 uint32_t timer)
204 {
205 int i, found = 0;
206 struct lttng_live_relay_session *relay_session;
207
208 for (i = 0; i < session_list->len; i++) {
209 relay_session = g_ptr_array_index(session_list, i);
210 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
211 strncmp(relay_session->name,
212 session_name, NAME_MAX) == 0) {
213 relay_session->streams += streams;
214 if (relay_session->clients < clients)
215 relay_session->clients = clients;
216 found = 1;
217 break;
218 }
219 }
220 if (found)
221 return;
222
223 relay_session = g_new0(struct lttng_live_relay_session, 1);
224 relay_session->hostname = strndup(hostname, NAME_MAX);
225 relay_session->name = strndup(session_name, NAME_MAX);
226 relay_session->clients = clients;
227 relay_session->streams = streams;
228 relay_session->timer = timer;
229 g_ptr_array_add(session_list, relay_session);
230 }
231
232 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
233 {
234 struct lttng_viewer_cmd cmd;
235 struct lttng_viewer_list_sessions list;
236 struct lttng_viewer_session lsession;
237 int i, ret, sessions_count, print_list = 0;
238 ssize_t ret_len;
239 uint64_t session_id;
240 GPtrArray *session_list = NULL;
241
242 if (strlen(ctx->session_name) == 0) {
243 print_list = 1;
244 session_list = g_ptr_array_new();
245 }
246
247 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
248 cmd.data_size = 0;
249 cmd.cmd_version = 0;
250
251 do {
252 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
253 } while (ret_len < 0 && errno == EINTR);
254 if (ret_len < 0) {
255 perror("[error] Error sending cmd");
256 ret = ret_len;
257 goto error;
258 }
259 assert(ret_len == sizeof(cmd));
260
261 do {
262 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
263 } while (ret_len < 0 && errno == EINTR);
264 if (ret_len == 0) {
265 fprintf(stderr, "[error] Remote side has closed connection\n");
266 ret = -1;
267 goto error;
268 }
269 if (ret_len < 0) {
270 perror("[error] Error receiving session list");
271 ret = ret_len;
272 goto error;
273 }
274 assert(ret_len == sizeof(list));
275
276 sessions_count = be32toh(list.sessions_count);
277 for (i = 0; i < sessions_count; i++) {
278 do {
279 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
280 } while (ret_len < 0 && errno == EINTR);
281 if (ret_len == 0) {
282 fprintf(stderr, "[error] Remote side has closed connection\n");
283 ret = -1;
284 goto error;
285 }
286 if (ret_len < 0) {
287 perror("[error] Error receiving session");
288 ret = ret_len;
289 goto error;
290 }
291 assert(ret_len == sizeof(lsession));
292 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
293 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
294 session_id = be64toh(lsession.id);
295
296 if (print_list) {
297 update_session_list(session_list,
298 lsession.hostname,
299 lsession.session_name,
300 be32toh(lsession.streams),
301 be32toh(lsession.clients),
302 be32toh(lsession.live_timer));
303 } else {
304 if ((strncmp(lsession.session_name, ctx->session_name,
305 NAME_MAX) == 0) && (strncmp(lsession.hostname,
306 ctx->traced_hostname, NAME_MAX) == 0)) {
307 printf_verbose("Reading from session %" PRIu64 "\n",
308 session_id);
309 g_array_append_val(ctx->session_ids,
310 session_id);
311 }
312 }
313 }
314
315 if (print_list) {
316 print_session_list(session_list, path);
317 free_session_list(session_list);
318 }
319
320 ret = 0;
321
322 error:
323 return ret;
324 }
325
326 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
327 uint64_t ctf_trace_id)
328 {
329 struct lttng_live_ctf_trace *trace;
330 int ret = 0;
331
332 trace = g_hash_table_lookup(stream->session->ctf_traces,
333 (gpointer) ctf_trace_id);
334 if (!trace) {
335 trace = g_new0(struct lttng_live_ctf_trace, 1);
336 trace->ctf_trace_id = ctf_trace_id;
337 trace->streams = g_ptr_array_new();
338 g_hash_table_insert(stream->session->ctf_traces,
339 (gpointer) ctf_trace_id,
340 trace);
341 }
342 if (stream->metadata_flag)
343 trace->metadata_stream = stream;
344
345 stream->ctf_trace = trace;
346 g_ptr_array_add(trace->streams, stream);
347
348 return ret;
349 }
350
351 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
352 {
353 struct lttng_viewer_cmd cmd;
354 struct lttng_viewer_attach_session_request rq;
355 struct lttng_viewer_attach_session_response rp;
356 struct lttng_viewer_stream stream;
357 int ret, i;
358 ssize_t ret_len;
359
360 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
361 cmd.data_size = sizeof(rq);
362 cmd.cmd_version = 0;
363
364 memset(&rq, 0, sizeof(rq));
365 rq.session_id = htobe64(id);
366 // TODO: add cmd line parameter to select seek beginning
367 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
368 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
369
370 do {
371 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
372 } while (ret_len < 0 && errno == EINTR);
373 if (ret_len < 0) {
374 perror("[error] Error sending cmd");
375 ret = ret_len;
376 goto error;
377 }
378 assert(ret_len == sizeof(cmd));
379
380 do {
381 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
382 } while (ret_len < 0 && errno == EINTR);
383 if (ret_len < 0) {
384 perror("[error] Error sending attach request");
385 ret = ret_len;
386 goto error;
387 }
388 assert(ret_len == sizeof(rq));
389
390 do {
391 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
392 } while (ret_len < 0 && errno == EINTR);
393 if (ret_len == 0) {
394 fprintf(stderr, "[error] Remote side has closed connection\n");
395 ret = -1;
396 goto error;
397 }
398 if (ret_len < 0) {
399 perror("[error] Error receiving attach response");
400 ret = ret_len;
401 goto error;
402 }
403 assert(ret_len == sizeof(rp));
404
405 switch(be32toh(rp.status)) {
406 case LTTNG_VIEWER_ATTACH_OK:
407 break;
408 case LTTNG_VIEWER_ATTACH_UNK:
409 ret = -LTTNG_VIEWER_ATTACH_UNK;
410 goto end;
411 case LTTNG_VIEWER_ATTACH_ALREADY:
412 fprintf(stderr, "[error] There is already a viewer attached to this session\n");
413 ret = -1;
414 goto end;
415 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
416 fprintf(stderr, "[error] Not a live session\n");
417 ret = -1;
418 goto end;
419 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
420 fprintf(stderr, "[error] Wrong seek parameter\n");
421 ret = -1;
422 goto end;
423 default:
424 fprintf(stderr, "[error] Unknown attach return code %u\n",
425 be32toh(rp.status));
426 ret = -1;
427 goto end;
428 }
429 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
430 ret = -1;
431 goto end;
432 }
433
434 ctx->session->stream_count += be32toh(rp.streams_count);
435 /*
436 * When the session is created but not started, we do an active wait
437 * until it starts. It allows the viewer to start processing the trace
438 * as soon as the session starts.
439 */
440 if (ctx->session->stream_count == 0) {
441 ret = 0;
442 goto end;
443 }
444 printf_verbose("Waiting for %" PRIu64 " streams:\n",
445 ctx->session->stream_count);
446 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
447 ctx->session->stream_count);
448 for (i = 0; i < be32toh(rp.streams_count); i++) {
449 do {
450 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
451 } while (ret_len < 0 && errno == EINTR);
452 if (ret_len == 0) {
453 fprintf(stderr, "[error] Remote side has closed connection\n");
454 ret = -1;
455 goto error;
456 }
457 if (ret_len < 0) {
458 perror("[error] Error receiving stream");
459 ret = ret_len;
460 goto error;
461 }
462 assert(ret_len == sizeof(stream));
463 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
464 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
465
466 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
467 be64toh(stream.id), stream.path_name,
468 stream.channel_name);
469 ctx->session->streams[i].id = be64toh(stream.id);
470 ctx->session->streams[i].session = ctx->session;
471
472 ctx->session->streams[i].first_read = 1;
473 ctx->session->streams[i].mmap_size = 0;
474
475 if (be32toh(stream.metadata_flag)) {
476 char *path;
477
478 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
479 if (!path) {
480 perror("strdup");
481 ret = -1;
482 goto error;
483 }
484 if (!mkdtemp(path)) {
485 perror("mkdtemp");
486 free(path);
487 ret = -1;
488 goto error;
489 }
490 ctx->session->streams[i].metadata_flag = 1;
491 snprintf(ctx->session->streams[i].path,
492 sizeof(ctx->session->streams[i].path),
493 "%s/%s", path,
494 stream.channel_name);
495 ret = open(ctx->session->streams[i].path,
496 O_WRONLY | O_CREAT | O_TRUNC,
497 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
498 if (ret < 0) {
499 perror("open");
500 free(path);
501 goto error;
502 }
503 ctx->session->streams[i].fd = ret;
504 free(path);
505 }
506 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
507 be64toh(stream.ctf_trace_id));
508 if (ret < 0) {
509 goto error;
510 }
511
512 }
513 ret = 0;
514
515 end:
516 error:
517 return ret;
518 }
519
520 static
521 int ask_new_streams(struct lttng_live_ctx *ctx)
522 {
523 int i, ret = 0;
524 uint64_t id;
525
526 restart:
527 for (i = 0; i < ctx->session_ids->len; i++) {
528 id = g_array_index(ctx->session_ids, uint64_t, i);
529 ret = lttng_live_get_new_streams(ctx, id);
530 printf_verbose("Asking for new streams returns %d\n",
531 ret);
532 if (ret < 0) {
533 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
534 printf_verbose("Session %" PRIu64 " closed\n",
535 id);
536 /*
537 * The streams have already been closed during
538 * the reading, so we only need to get rid of
539 * the trace in our internal table of sessions.
540 */
541 g_array_remove_index(ctx->session_ids, i);
542 /*
543 * We can't continue iterating on the g_array
544 * after a remove, we have to start again.
545 */
546 goto restart;
547 } else {
548 ret = -1;
549 goto end;
550 }
551 }
552 }
553
554 end:
555 return ret;
556 }
557
558 static
559 int get_data_packet(struct lttng_live_ctx *ctx,
560 struct ctf_stream_pos *pos,
561 struct lttng_live_viewer_stream *stream, uint64_t offset,
562 uint64_t len)
563 {
564 struct lttng_viewer_cmd cmd;
565 struct lttng_viewer_get_packet rq;
566 struct lttng_viewer_trace_packet rp;
567 ssize_t ret_len;
568 int ret;
569
570 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
571 cmd.data_size = sizeof(rq);
572 cmd.cmd_version = 0;
573
574 memset(&rq, 0, sizeof(rq));
575 rq.stream_id = htobe64(stream->id);
576 /* Already in big endian. */
577 rq.offset = offset;
578 rq.len = htobe32(len);
579
580 do {
581 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
582 } while (ret_len < 0 && errno == EINTR);
583 if (ret_len < 0) {
584 perror("[error] Error sending cmd");
585 ret = ret_len;
586 goto error;
587 }
588 assert(ret_len == sizeof(cmd));
589
590 do {
591 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
592 } while (ret_len < 0 && errno == EINTR);
593 if (ret_len < 0) {
594 perror("[error] Error sending get_data_packet request");
595 ret = ret_len;
596 goto error;
597 }
598 assert(ret_len == sizeof(rq));
599
600 do {
601 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
602 } while (ret_len < 0 && errno == EINTR);
603 if (ret_len == 0) {
604 fprintf(stderr, "[error] Remote side has closed connection\n");
605 ret = -1;
606 goto error;
607 }
608 if (ret_len < 0) {
609 perror("[error] Error receiving data response");
610 ret = ret_len;
611 goto error;
612 }
613 if (ret_len != sizeof(rp)) {
614 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
615 ", received %" PRId64 "\n", ret_len,
616 sizeof(rp));
617 ret = -1;
618 goto error;
619 }
620
621 rp.flags = be32toh(rp.flags);
622
623 switch (be32toh(rp.status)) {
624 case LTTNG_VIEWER_GET_PACKET_OK:
625 len = be32toh(rp.len);
626 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
627 "\n", len);
628 break;
629 case LTTNG_VIEWER_GET_PACKET_RETRY:
630 printf_verbose("get_data_packet: retry\n");
631 ret = -1;
632 goto end;
633 case LTTNG_VIEWER_GET_PACKET_ERR:
634 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
635 printf_verbose("get_data_packet: new metadata needed\n");
636 ret = get_new_metadata(ctx, stream);
637 if (ret < 0) {
638 goto error;
639 }
640 ret = 0;
641 goto end;
642 }
643 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
644 ret = ask_new_streams(ctx);
645 if (ret < 0)
646 goto error;
647 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
648 ctx->bt_ctx);
649 }
650 fprintf(stderr, "[error] get_data_packet: error\n");
651 ret = -1;
652 goto end;
653 case LTTNG_VIEWER_GET_PACKET_EOF:
654 ret = -2;
655 goto error;
656 default:
657 printf_verbose("get_data_packet: unknown\n");
658 assert(0);
659 ret = -1;
660 goto end;
661 }
662
663 if (len <= 0) {
664 ret = -1;
665 goto end;
666 }
667
668 if (len > stream->mmap_size) {
669 uint64_t new_size;
670
671 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
672 if (pos->base_mma) {
673 /* unmap old base */
674 ret = munmap_align(pos->base_mma);
675 if (ret) {
676 perror("[error] Unable to unmap old base");
677 ret = -1;
678 goto error;
679 }
680 pos->base_mma = NULL;
681 }
682 pos->base_mma = mmap_align(new_size,
683 PROT_READ | PROT_WRITE,
684 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
685 if (pos->base_mma == MAP_FAILED) {
686 perror("[error] mmap error");
687 pos->base_mma = NULL;
688 ret = -1;
689 goto error;
690 }
691
692 stream->mmap_size = new_size;
693 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
694 stream->mmap_size);
695 }
696
697 do {
698 ret_len = recv(ctx->control_sock,
699 mmap_align_addr(pos->base_mma), len,
700 MSG_WAITALL);
701 } while (ret_len < 0 && errno == EINTR);
702 if (ret_len == 0) {
703 fprintf(stderr, "[error] Remote side has closed connection\n");
704 ret = -1;
705 goto error;
706 }
707 if (ret_len < 0) {
708 perror("[error] Error receiving trace packet");
709 ret = ret_len;
710 goto error;
711 }
712 assert(ret_len == len);
713
714 end:
715 error:
716 return ret;
717 }
718
719 static
720 int get_one_metadata_packet(struct lttng_live_ctx *ctx,
721 struct lttng_live_viewer_stream *metadata_stream)
722 {
723 uint64_t len = 0;
724 int ret;
725 struct lttng_viewer_cmd cmd;
726 struct lttng_viewer_get_metadata rq;
727 struct lttng_viewer_metadata_packet rp;
728 char *data = NULL;
729 ssize_t ret_len;
730
731 rq.stream_id = htobe64(metadata_stream->id);
732 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
733 cmd.data_size = sizeof(rq);
734 cmd.cmd_version = 0;
735
736 do {
737 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
738 } while (ret_len < 0 && errno == EINTR);
739 if (ret_len < 0) {
740 perror("[error] Error sending cmd");
741 ret = ret_len;
742 goto error;
743 }
744 assert(ret_len == sizeof(cmd));
745
746 do {
747 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
748 } while (ret_len < 0 && errno == EINTR);
749 if (ret_len < 0) {
750 perror("[error] Error sending get_metadata request");
751 ret = ret_len;
752 goto error;
753 }
754 assert(ret_len == sizeof(rq));
755
756 do {
757 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
758 } while (ret_len < 0 && errno == EINTR);
759 if (ret_len == 0) {
760 fprintf(stderr, "[error] Remote side has closed connection\n");
761 ret = -1;
762 goto error;
763 }
764 if (ret_len < 0) {
765 perror("[error] Error receiving metadata response");
766 ret = ret_len;
767 goto error;
768 }
769 assert(ret_len == sizeof(rp));
770
771 switch (be32toh(rp.status)) {
772 case LTTNG_VIEWER_METADATA_OK:
773 printf_verbose("get_metadata : OK\n");
774 break;
775 case LTTNG_VIEWER_NO_NEW_METADATA:
776 printf_verbose("get_metadata : NO NEW\n");
777 ret = 0;
778 goto end;
779 case LTTNG_VIEWER_METADATA_ERR:
780 printf_verbose("get_metadata : ERR\n");
781 ret = -1;
782 goto end;
783 default:
784 printf_verbose("get_metadata : UNKNOWN\n");
785 ret = -1;
786 goto end;
787 }
788
789 len = be64toh(rp.len);
790 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
791 if (len <= 0) {
792 ret = -1;
793 goto end;
794 }
795
796 data = zmalloc(len);
797 if (!data) {
798 perror("relay data zmalloc");
799 ret = -1;
800 goto error;
801 }
802 do {
803 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
804 } while (ret_len < 0 && errno == EINTR);
805 if (ret_len == 0) {
806 fprintf(stderr, "[error] Remote side has closed connection\n");
807 ret = -1;
808 free(data);
809 goto error;
810 }
811 if (ret_len < 0) {
812 perror("[error] Error receiving trace packet");
813 ret = ret_len;
814 free(data);
815 goto error;
816 }
817 assert(ret_len == len);
818
819 do {
820 ret_len = write(metadata_stream->fd, data, len);
821 } while (ret_len < 0 && errno == EINTR);
822 if (ret_len < 0) {
823 free(data);
824 ret = ret_len;
825 goto error;
826 }
827 assert(ret_len == len);
828 ret = len;
829
830 free(data);
831
832 end:
833 error:
834 return ret;
835 }
836
837 /*
838 * Return 0 on success, a negative value on error.
839 */
840 static
841 int get_new_metadata(struct lttng_live_ctx *ctx,
842 struct lttng_live_viewer_stream *viewer_stream)
843 {
844 int ret = 0;
845 struct lttng_live_viewer_stream *metadata_stream;
846
847 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
848
849 do {
850 /*
851 * get_one_metadata_packet returns the number of bytes
852 * received, 0 when we have received everything, a
853 * negative value on error.
854 */
855 ret = get_one_metadata_packet(ctx, metadata_stream);
856 } while (ret > 0);
857
858 fclose(metadata_stream->metadata_fp_write);
859 metadata_stream->metadata_fp_write = NULL;
860
861 error:
862 return ret;
863 }
864
865 /*
866 * Get one index for a stream.
867 *
868 * Returns 0 on success or a negative value on error.
869 */
870 static
871 int get_next_index(struct lttng_live_ctx *ctx,
872 struct lttng_live_viewer_stream *viewer_stream,
873 struct packet_index *index)
874 {
875 struct lttng_viewer_cmd cmd;
876 struct lttng_viewer_get_next_index rq;
877 struct lttng_viewer_index rp;
878 int ret;
879 ssize_t ret_len;
880
881 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
882 cmd.data_size = sizeof(rq);
883 cmd.cmd_version = 0;
884
885 memset(&rq, 0, sizeof(rq));
886 rq.stream_id = htobe64(viewer_stream->id);
887
888 retry:
889 do {
890 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
891 } while (ret_len < 0 && errno == EINTR);
892 if (ret_len < 0) {
893 perror("[error] Error sending cmd");
894 ret = ret_len;
895 goto error;
896 }
897 assert(ret_len == sizeof(cmd));
898
899 do {
900 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
901 } while (ret_len < 0 && errno == EINTR);
902 if (ret_len < 0) {
903 perror("[error] Error sending get_next_index request");
904 ret = ret_len;
905 goto error;
906 }
907 assert(ret_len == sizeof(rq));
908
909 do {
910 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
911 } while (ret_len < 0 && errno == EINTR);
912 if (ret_len == 0) {
913 fprintf(stderr, "[error] Remote side has closed connection\n");
914 ret = -1;
915 goto error;
916 }
917 if (ret_len < 0) {
918 perror("[error] Error receiving index response");
919 ret = ret_len;
920 goto error;
921 }
922 assert(ret_len == sizeof(rp));
923
924 rp.flags = be32toh(rp.flags);
925
926 switch (be32toh(rp.status)) {
927 case LTTNG_VIEWER_INDEX_INACTIVE:
928 printf_verbose("get_next_index: inactive\n");
929 memset(index, 0, sizeof(struct packet_index));
930 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
931 break;
932 case LTTNG_VIEWER_INDEX_OK:
933 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
934 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
935 index->offset = be64toh(rp.offset);
936 index->packet_size = be64toh(rp.packet_size);
937 index->content_size = be64toh(rp.content_size);
938 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
939 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
940 index->events_discarded = be64toh(rp.events_discarded);
941
942 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
943 printf_verbose("get_next_index: new metadata needed\n");
944 ret = get_new_metadata(ctx, viewer_stream);
945 if (ret < 0) {
946 goto error;
947 }
948 }
949 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
950 ret = ask_new_streams(ctx);
951 if (ret < 0)
952 goto error;
953 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
954 ctx->bt_ctx);
955 }
956 break;
957 case LTTNG_VIEWER_INDEX_RETRY:
958 printf_verbose("get_next_index: retry\n");
959 sleep(1);
960 goto retry;
961 case LTTNG_VIEWER_INDEX_HUP:
962 printf_verbose("get_next_index: stream hung up\n");
963 viewer_stream->id = -1ULL;
964 viewer_stream->fd = -1;
965 index->offset = EOF;
966 ctx->session->stream_count--;
967 break;
968 case LTTNG_VIEWER_INDEX_ERR:
969 fprintf(stderr, "[error] get_next_index: error\n");
970 ret = -1;
971 goto error;
972 default:
973 fprintf(stderr, "[error] get_next_index: unkwown value\n");
974 ret = -1;
975 goto error;
976 }
977
978 ret = 0;
979
980 error:
981 return ret;
982 }
983
984 static
985 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
986 int whence)
987 {
988 struct ctf_stream_pos *pos;
989 struct ctf_file_stream *file_stream;
990 struct packet_index *prev_index = NULL, *cur_index;
991 struct lttng_live_viewer_stream *viewer_stream;
992 struct lttng_live_session *session;
993 int ret;
994
995 retry:
996 pos = ctf_pos(stream_pos);
997 file_stream = container_of(pos, struct ctf_file_stream, pos);
998 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
999 session = viewer_stream->session;
1000
1001 switch (pos->packet_index->len) {
1002 case 0:
1003 g_array_set_size(pos->packet_index, 1);
1004 cur_index = &g_array_index(pos->packet_index,
1005 struct packet_index, 0);
1006 break;
1007 case 1:
1008 g_array_set_size(pos->packet_index, 2);
1009 prev_index = &g_array_index(pos->packet_index,
1010 struct packet_index, 0);
1011 cur_index = &g_array_index(pos->packet_index,
1012 struct packet_index, 1);
1013 break;
1014 case 2:
1015 g_array_index(pos->packet_index,
1016 struct packet_index, 0) =
1017 g_array_index(pos->packet_index,
1018 struct packet_index, 1);
1019 prev_index = &g_array_index(pos->packet_index,
1020 struct packet_index, 0);
1021 cur_index = &g_array_index(pos->packet_index,
1022 struct packet_index, 1);
1023 break;
1024 default:
1025 abort();
1026 break;
1027 }
1028 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
1029 ret = get_next_index(session->ctx, viewer_stream, cur_index);
1030 if (ret < 0) {
1031 pos->offset = EOF;
1032 fprintf(stderr, "[error] get_next_index failed\n");
1033 return;
1034 }
1035
1036 pos->packet_size = cur_index->packet_size;
1037 pos->content_size = cur_index->content_size;
1038 pos->mmap_base_offset = 0;
1039 if (cur_index->offset == EOF) {
1040 pos->offset = EOF;
1041 } else {
1042 pos->offset = 0;
1043 }
1044
1045 if (cur_index->content_size == 0) {
1046 file_stream->parent.cycles_timestamp =
1047 cur_index->ts_cycles.timestamp_end;
1048 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
1049 &file_stream->parent,
1050 cur_index->ts_cycles.timestamp_end);
1051 } else {
1052 /* Convert the timestamps and append to the real_index. */
1053 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1054 &file_stream->parent,
1055 cur_index->ts_cycles.timestamp_begin);
1056 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1057 &file_stream->parent,
1058 cur_index->ts_cycles.timestamp_end);
1059
1060 ctf_update_current_packet_index(&file_stream->parent,
1061 prev_index, cur_index);
1062
1063 file_stream->parent.cycles_timestamp =
1064 cur_index->ts_cycles.timestamp_begin;
1065 file_stream->parent.real_timestamp =
1066 cur_index->ts_real.timestamp_begin;
1067 }
1068
1069 if (pos->packet_size == 0 || pos->offset == EOF) {
1070 goto end;
1071 }
1072
1073 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1074 viewer_stream->id);
1075 ret = get_data_packet(session->ctx, pos, viewer_stream,
1076 be64toh(cur_index->offset),
1077 cur_index->packet_size / CHAR_BIT);
1078 if (ret == -2) {
1079 goto retry;
1080 } else if (ret < 0) {
1081 pos->offset = EOF;
1082 fprintf(stderr, "[error] get_data_packet failed\n");
1083 return;
1084 }
1085
1086 printf_verbose("Index received : packet_size : %" PRIu64
1087 ", offset %" PRIu64 ", content_size %" PRIu64
1088 ", timestamp_end : %" PRIu64 "\n",
1089 cur_index->packet_size, cur_index->offset,
1090 cur_index->content_size,
1091 cur_index->ts_cycles.timestamp_end);
1092
1093 /* update trace_packet_header and stream_packet_context */
1094 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1095 /* Read packet header */
1096 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1097 if (ret) {
1098 pos->offset = EOF;
1099 fprintf(stderr, "[error] trace packet header read failed\n");
1100 goto end;
1101 }
1102 }
1103 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1104 /* Read packet context */
1105 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1106 if (ret) {
1107 pos->offset = EOF;
1108 fprintf(stderr, "[error] stream packet context read failed\n");
1109 goto end;
1110 }
1111 }
1112 pos->data_offset = pos->offset;
1113
1114 end:
1115 return;
1116 }
1117
1118 int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1119 {
1120 struct lttng_viewer_cmd cmd;
1121 struct lttng_viewer_create_session_response resp;
1122 int ret;
1123 ssize_t ret_len;
1124
1125 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1126 cmd.data_size = 0;
1127 cmd.cmd_version = 0;
1128
1129 do {
1130 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1131 } while (ret_len < 0 && errno == EINTR);
1132 if (ret_len < 0) {
1133 perror("[error] Error sending cmd");
1134 ret = ret_len;
1135 goto error;
1136 }
1137 assert(ret_len == sizeof(cmd));
1138
1139 do {
1140 ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
1141 } while (ret_len < 0 && errno == EINTR);
1142 if (ret_len == 0) {
1143 fprintf(stderr, "[error] Remote side has closed connection\n");
1144 ret = -1;
1145 goto error;
1146 }
1147 if (ret_len < 0) {
1148 perror("[error] Error receiving create session reply");
1149 ret = ret_len;
1150 goto error;
1151 }
1152 assert(ret_len == sizeof(resp));
1153
1154 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1155 fprintf(stderr, "[error] Error creating viewer session\n");
1156 ret = -1;
1157 goto error;
1158 }
1159 ret = 0;
1160
1161 error:
1162 return ret;
1163 }
1164
1165 static
1166 int del_traces(gpointer key, gpointer value, gpointer user_data)
1167 {
1168 struct bt_context *bt_ctx = user_data;
1169 struct lttng_live_ctf_trace *trace = value;
1170 int ret;
1171
1172 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1173 if (ret < 0)
1174 fprintf(stderr, "[error] removing trace from context\n");
1175
1176 /* remove the key/value pair from the HT. */
1177 return 1;
1178 }
1179
1180 static
1181 void add_traces(gpointer key, gpointer value, gpointer user_data)
1182 {
1183 int i, ret;
1184 struct bt_context *bt_ctx = user_data;
1185 struct lttng_live_ctf_trace *trace = value;
1186 struct lttng_live_viewer_stream *stream;
1187 struct bt_mmap_stream *new_mmap_stream;
1188 struct bt_mmap_stream_list mmap_list;
1189 struct lttng_live_ctx *ctx = NULL;
1190
1191 /*
1192 * We don't know how many streams we will receive for a trace, so
1193 * once we are done receiving the traces, we add all the traces
1194 * received to the bt_context.
1195 * We can receive streams during the attach command or the
1196 * get_new_streams, so we have to make sure not to add multiple
1197 * times the same traces.
1198 * If a trace is already in the context, we just skip this function.
1199 */
1200 if (trace->in_use)
1201 return;
1202
1203 BT_INIT_LIST_HEAD(&mmap_list.head);
1204
1205 for (i = 0; i < trace->streams->len; i++) {
1206 stream = g_ptr_array_index(trace->streams, i);
1207 ctx = stream->session->ctx;
1208
1209 if (!stream->metadata_flag) {
1210 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1211 new_mmap_stream->priv = (void *) stream;
1212 new_mmap_stream->fd = -1;
1213 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1214 } else {
1215 /* Get all possible metadata before starting */
1216 ret = get_new_metadata(ctx, stream);
1217 if (ret)
1218 goto end_free;
1219 trace->metadata_fp = fopen(stream->path, "r");
1220 }
1221 }
1222
1223 if (!trace->metadata_fp) {
1224 fprintf(stderr, "[error] No metadata stream opened\n");
1225 goto end_free;
1226 }
1227
1228 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1229 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1230 if (ret < 0) {
1231 fprintf(stderr, "[error] Error adding trace\n");
1232 goto end_free;
1233 }
1234
1235 if (bt_ctx->current_iterator) {
1236 struct bt_trace_descriptor *td;
1237 struct bt_trace_handle *handle;
1238
1239 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1240 bt_ctx->trace_handles,
1241 (gpointer) (unsigned long) ret);
1242 td = handle->td;
1243 bt_iter_add_trace(bt_ctx->current_iterator, td);
1244 }
1245
1246 trace->trace_id = ret;
1247 trace->in_use = 1;
1248
1249 goto end;
1250
1251 end_free:
1252 bt_context_put(bt_ctx);
1253 end:
1254 return;
1255 }
1256
1257 int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
1258 {
1259 struct lttng_viewer_cmd cmd;
1260 struct lttng_viewer_new_streams_request rq;
1261 struct lttng_viewer_new_streams_response rp;
1262 struct lttng_viewer_stream stream;
1263 int ret, i;
1264 ssize_t ret_len;
1265 uint32_t stream_count;
1266
1267 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1268 cmd.data_size = sizeof(rq);
1269 cmd.cmd_version = 0;
1270
1271 memset(&rq, 0, sizeof(rq));
1272 rq.session_id = htobe64(id);
1273
1274 do {
1275 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1276 } while (ret_len < 0 && errno == EINTR);
1277 if (ret_len < 0) {
1278 perror("[error] Error sending cmd");
1279 ret = ret_len;
1280 goto error;
1281 }
1282 assert(ret_len == sizeof(cmd));
1283
1284 do {
1285 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
1286 } while (ret_len < 0 && errno == EINTR);
1287 if (ret_len < 0) {
1288 perror("[error] Error sending get_new_streams request");
1289 ret = ret_len;
1290 goto error;
1291 }
1292 assert(ret_len == sizeof(rq));
1293
1294 do {
1295 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
1296 } while (ret_len < 0 && errno == EINTR);
1297 if (ret_len == 0) {
1298 fprintf(stderr, "[error] Remote side has closed connection\n");
1299 ret = -1;
1300 goto error;
1301 }
1302 if (ret_len < 0) {
1303 perror("[error] Error receiving get_new_streams response");
1304 ret = ret_len;
1305 goto error;
1306 }
1307 assert(ret_len == sizeof(rp));
1308
1309 switch(be32toh(rp.status)) {
1310 case LTTNG_VIEWER_NEW_STREAMS_OK:
1311 break;
1312 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1313 ret = 0;
1314 goto end;
1315 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1316 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1317 goto end;
1318 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1319 fprintf(stderr, "[error] get_new_streams error\n");
1320 ret = -1;
1321 goto end;
1322 default:
1323 fprintf(stderr, "[error] Unknown return code %u\n",
1324 be32toh(rp.status));
1325 ret = -1;
1326 goto end;
1327 }
1328
1329 stream_count = be32toh(rp.streams_count);
1330 ctx->session->stream_count += stream_count;
1331 /*
1332 * When the session is created but not started, we do an active wait
1333 * until it starts. It allows the viewer to start processing the trace
1334 * as soon as the session starts.
1335 */
1336 if (ctx->session->stream_count == 0) {
1337 ret = 0;
1338 goto end;
1339 }
1340 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1341 ctx->session->stream_count);
1342 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1343 ctx->session->stream_count);
1344 for (i = 0; i < stream_count; i++) {
1345 do {
1346 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
1347 } while (ret_len < 0 && errno == EINTR);
1348 if (ret_len == 0) {
1349 fprintf(stderr, "[error] Remote side has closed connection\n");
1350 ret = -1;
1351 goto error;
1352 }
1353 if (ret_len < 0) {
1354 perror("[error] Error receiving stream");
1355 ret = ret_len;
1356 goto error;
1357 }
1358 assert(ret_len == sizeof(stream));
1359 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1360 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1361
1362 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1363 be64toh(stream.id), stream.path_name,
1364 stream.channel_name);
1365 ctx->session->streams[i].id = be64toh(stream.id);
1366 ctx->session->streams[i].session = ctx->session;
1367
1368 ctx->session->streams[i].first_read = 1;
1369 ctx->session->streams[i].mmap_size = 0;
1370
1371 if (be32toh(stream.metadata_flag)) {
1372 char *path;
1373
1374 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
1375 if (!path) {
1376 perror("strdup");
1377 ret = -1;
1378 goto error;
1379 }
1380 if (!mkdtemp(path)) {
1381 perror("mkdtemp");
1382 free(path);
1383 ret = -1;
1384 goto error;
1385 }
1386 ctx->session->streams[i].metadata_flag = 1;
1387 snprintf(ctx->session->streams[i].path,
1388 sizeof(ctx->session->streams[i].path),
1389 "%s/%s", path,
1390 stream.channel_name);
1391 ret = open(ctx->session->streams[i].path,
1392 O_WRONLY | O_CREAT | O_TRUNC,
1393 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1394 if (ret < 0) {
1395 perror("open");
1396 free(path);
1397 goto error;
1398 }
1399 ctx->session->streams[i].fd = ret;
1400 free(path);
1401 }
1402 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1403 be64toh(stream.ctf_trace_id));
1404 if (ret < 0) {
1405 goto error;
1406 }
1407
1408 }
1409 ret = 0;
1410
1411 end:
1412 error:
1413 return ret;
1414 }
1415
1416 void lttng_live_read(struct lttng_live_ctx *ctx)
1417 {
1418 int ret, i;
1419 struct bt_ctf_iter *iter;
1420 const struct bt_ctf_event *event;
1421 struct bt_iter_pos begin_pos;
1422 struct bt_trace_descriptor *td_write;
1423 struct bt_format *fmt_write;
1424 struct ctf_text_stream_pos *sout;
1425 uint64_t id;
1426
1427 ctx->bt_ctx = bt_context_create();
1428 if (!ctx->bt_ctx) {
1429 fprintf(stderr, "[error] bt_context_create allocation\n");
1430 goto end;
1431 }
1432
1433 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1434 if (!fmt_write) {
1435 fprintf(stderr, "[error] ctf-text error\n");
1436 goto end;
1437 }
1438
1439 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1440 if (!td_write) {
1441 fprintf(stderr, "[error] Error opening output trace\n");
1442 goto end_free;
1443 }
1444
1445 sout = container_of(td_write, struct ctf_text_stream_pos,
1446 trace_descriptor);
1447 if (!sout->parent.event_cb)
1448 goto end_free;
1449
1450 ret = lttng_live_create_viewer_session(ctx);
1451 if (ret < 0) {
1452 goto end_free;
1453 }
1454
1455 for (i = 0; i < ctx->session_ids->len; i++) {
1456 id = g_array_index(ctx->session_ids, uint64_t, i);
1457 printf_verbose("Attaching to session %lu\n", id);
1458 ret = lttng_live_attach_session(ctx, id);
1459 printf_verbose("Attaching session returns %d\n", ret);
1460 if (ret < 0) {
1461 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1462 fprintf(stderr, "[error] Unknown session ID\n");
1463 }
1464 goto end_free;
1465 }
1466 }
1467
1468 /*
1469 * As long as the session is active, we try to get new streams.
1470 */
1471 for (;;) {
1472 int flags;
1473
1474 while (!ctx->session->stream_count) {
1475 if (ctx->session_ids->len == 0)
1476 goto end_free;
1477 ret = ask_new_streams(ctx);
1478 if (ret < 0)
1479 goto end_free;
1480 }
1481
1482 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1483 ctx->bt_ctx);
1484
1485 begin_pos.type = BT_SEEK_BEGIN;
1486 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
1487 if (!iter) {
1488 fprintf(stderr, "[error] Iterator creation error\n");
1489 goto end;
1490 }
1491 for (;;) {
1492 event = bt_ctf_iter_read_event_flags(iter, &flags);
1493 if (!(flags & BT_ITER_FLAG_RETRY)) {
1494 if (!event) {
1495 /* End of trace */
1496 break;
1497 }
1498 ret = sout->parent.event_cb(&sout->parent,
1499 event->parent->stream);
1500 if (ret) {
1501 fprintf(stderr, "[error] Writing "
1502 "event failed.\n");
1503 goto end_free;
1504 }
1505 }
1506 ret = bt_iter_next(bt_ctf_get_iter(iter));
1507 if (ret < 0) {
1508 goto end_free;
1509 }
1510 }
1511 bt_ctf_iter_destroy(iter);
1512 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1513 del_traces, ctx->bt_ctx);
1514 ctx->session->stream_count = 0;
1515 }
1516
1517 end_free:
1518 bt_context_put(ctx->bt_ctx);
1519 end:
1520 return;
1521 }
This page took 0.089799 seconds and 5 git commands to generate.