Fix: get_new_metadata receive all the metadata
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include <babeltrace/ctf/ctf-index.h>
38
39 #include <babeltrace/babeltrace.h>
40 #include <babeltrace/ctf/events.h>
41 #include <babeltrace/ctf/callbacks.h>
42 #include <babeltrace/ctf/iterator.h>
43
44 /* for packet_index */
45 #include <babeltrace/ctf/types.h>
46
47 #include <babeltrace/ctf/metadata.h>
48 #include <babeltrace/ctf-text/types.h>
49 #include <babeltrace/ctf/events-internal.h>
50 #include <formats/ctf/events-private.h>
51
52 #include "lttng-live.h"
53 #include "lttng-viewer-abi.h"
54
55 /*
56 * Memory allocation zeroed
57 */
58 #define zmalloc(x) calloc(1, x)
59
60 #ifndef max_t
61 #define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63 #endif
64
65 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
66 size_t index, int whence);
67 static void add_traces(gpointer key, gpointer value, gpointer user_data);
68 static int del_traces(gpointer key, gpointer value, gpointer user_data);
69 static int get_new_metadata(struct lttng_live_ctx *ctx,
70 struct lttng_live_viewer_stream *viewer_stream);
71
72 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
73 {
74 struct hostent *host;
75 struct sockaddr_in server_addr;
76 int ret;
77
78 host = gethostbyname(ctx->relay_hostname);
79 if (!host) {
80 ret = -1;
81 goto end;
82 }
83
84 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
85 perror("Socket");
86 ret = -1;
87 goto end;
88 }
89
90 server_addr.sin_family = AF_INET;
91 server_addr.sin_port = htons(ctx->port);
92 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
93 bzero(&(server_addr.sin_zero), 8);
94
95 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
96 sizeof(struct sockaddr)) == -1) {
97 perror("Connect");
98 ret = -1;
99 goto end;
100 }
101
102 ret = 0;
103
104 end:
105 return ret;
106 }
107
108 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
109 {
110 struct lttng_viewer_cmd cmd;
111 struct lttng_viewer_connect connect;
112 int ret;
113 ssize_t ret_len;
114
115 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
116 cmd.data_size = sizeof(connect);
117 cmd.cmd_version = 0;
118
119 connect.viewer_session_id = -1ULL; /* will be set on recv */
120 connect.major = htobe32(LTTNG_LIVE_MAJOR);
121 connect.minor = htobe32(LTTNG_LIVE_MINOR);
122 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
123
124 do {
125 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
126 } while (ret_len < 0 && errno == EINTR);
127 if (ret_len < 0) {
128 perror("[error] Error sending cmd");
129 ret = ret_len;
130 goto error;
131 }
132 assert(ret_len == sizeof(cmd));
133
134 do {
135 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
136 } while (ret_len < 0 && errno == EINTR);
137 if (ret_len < 0) {
138 perror("[error] Error sending version");
139 ret = ret_len;
140 goto error;
141 }
142 assert(ret_len == sizeof(connect));
143
144 do {
145 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
146 } while (ret_len < 0 && errno == EINTR);
147 if (ret_len == 0) {
148 fprintf(stderr, "[error] Remote side has closed connection\n");
149 ret = -1;
150 goto error;
151 }
152 if (ret_len < 0) {
153 perror("[error] Error receiving version");
154 ret = ret_len;
155 goto error;
156 }
157 assert(ret_len == sizeof(connect));
158
159 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
160 be64toh(connect.viewer_session_id));
161 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
162 be32toh(connect.minor));
163
164 ret = 0;
165
166 error:
167 return ret;
168 }
169
170 static
171 void free_session_list(GPtrArray *session_list)
172 {
173 int i;
174 struct lttng_live_relay_session *relay_session;
175
176 for (i = 0; i < session_list->len; i++) {
177 relay_session = g_ptr_array_index(session_list, i);
178 free(relay_session->name);
179 free(relay_session->hostname);
180 }
181 g_ptr_array_free(session_list, TRUE);
182 }
183
184 static
185 void print_session_list(GPtrArray *session_list, const char *path)
186 {
187 int i;
188 struct lttng_live_relay_session *relay_session;
189
190 for (i = 0; i < session_list->len; i++) {
191 relay_session = g_ptr_array_index(session_list, i);
192 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
193 "%u stream(s), %u client(s) connected)\n",
194 path, relay_session->hostname,
195 relay_session->name, relay_session->timer,
196 relay_session->streams, relay_session->clients);
197 }
198 }
199
200 static
201 void update_session_list(GPtrArray *session_list, char *hostname,
202 char *session_name, uint32_t streams, uint32_t clients,
203 uint32_t timer)
204 {
205 int i, found = 0;
206 struct lttng_live_relay_session *relay_session;
207
208 for (i = 0; i < session_list->len; i++) {
209 relay_session = g_ptr_array_index(session_list, i);
210 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
211 strncmp(relay_session->name,
212 session_name, NAME_MAX) == 0) {
213 relay_session->streams += streams;
214 if (relay_session->clients < clients)
215 relay_session->clients = clients;
216 found = 1;
217 break;
218 }
219 }
220 if (found)
221 return;
222
223 relay_session = g_new0(struct lttng_live_relay_session, 1);
224 relay_session->hostname = strndup(hostname, NAME_MAX);
225 relay_session->name = strndup(session_name, NAME_MAX);
226 relay_session->clients = clients;
227 relay_session->streams = streams;
228 relay_session->timer = timer;
229 g_ptr_array_add(session_list, relay_session);
230 }
231
232 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
233 {
234 struct lttng_viewer_cmd cmd;
235 struct lttng_viewer_list_sessions list;
236 struct lttng_viewer_session lsession;
237 int i, ret, sessions_count, print_list = 0;
238 ssize_t ret_len;
239 uint64_t session_id;
240 GPtrArray *session_list = NULL;
241
242 if (strlen(ctx->session_name) == 0) {
243 print_list = 1;
244 session_list = g_ptr_array_new();
245 }
246
247 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
248 cmd.data_size = 0;
249 cmd.cmd_version = 0;
250
251 do {
252 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
253 } while (ret_len < 0 && errno == EINTR);
254 if (ret_len < 0) {
255 perror("[error] Error sending cmd");
256 ret = ret_len;
257 goto error;
258 }
259 assert(ret_len == sizeof(cmd));
260
261 do {
262 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
263 } while (ret_len < 0 && errno == EINTR);
264 if (ret_len == 0) {
265 fprintf(stderr, "[error] Remote side has closed connection\n");
266 ret = -1;
267 goto error;
268 }
269 if (ret_len < 0) {
270 perror("[error] Error receiving session list");
271 ret = ret_len;
272 goto error;
273 }
274 assert(ret_len == sizeof(list));
275
276 sessions_count = be32toh(list.sessions_count);
277 for (i = 0; i < sessions_count; i++) {
278 do {
279 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
280 } while (ret_len < 0 && errno == EINTR);
281 if (ret_len == 0) {
282 fprintf(stderr, "[error] Remote side has closed connection\n");
283 ret = -1;
284 goto error;
285 }
286 if (ret_len < 0) {
287 perror("[error] Error receiving session");
288 ret = ret_len;
289 goto error;
290 }
291 assert(ret_len == sizeof(lsession));
292 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
293 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
294 session_id = be64toh(lsession.id);
295
296 if (print_list) {
297 update_session_list(session_list,
298 lsession.hostname,
299 lsession.session_name,
300 be32toh(lsession.streams),
301 be32toh(lsession.clients),
302 be32toh(lsession.live_timer));
303 } else {
304 if ((strncmp(lsession.session_name, ctx->session_name,
305 NAME_MAX) == 0) && (strncmp(lsession.hostname,
306 ctx->traced_hostname, NAME_MAX) == 0)) {
307 printf_verbose("Reading from session %" PRIu64 "\n",
308 session_id);
309 g_array_append_val(ctx->session_ids,
310 session_id);
311 }
312 }
313 }
314
315 if (print_list) {
316 print_session_list(session_list, path);
317 free_session_list(session_list);
318 }
319
320 ret = 0;
321
322 error:
323 return ret;
324 }
325
326 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
327 uint64_t ctf_trace_id)
328 {
329 struct lttng_live_ctf_trace *trace;
330 int ret = 0;
331
332 trace = g_hash_table_lookup(stream->session->ctf_traces,
333 (gpointer) ctf_trace_id);
334 if (!trace) {
335 trace = g_new0(struct lttng_live_ctf_trace, 1);
336 trace->ctf_trace_id = ctf_trace_id;
337 trace->streams = g_ptr_array_new();
338 g_hash_table_insert(stream->session->ctf_traces,
339 (gpointer) ctf_trace_id,
340 trace);
341 }
342 if (stream->metadata_flag)
343 trace->metadata_stream = stream;
344
345 stream->ctf_trace = trace;
346 g_ptr_array_add(trace->streams, stream);
347
348 return ret;
349 }
350
351 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
352 {
353 struct lttng_viewer_cmd cmd;
354 struct lttng_viewer_attach_session_request rq;
355 struct lttng_viewer_attach_session_response rp;
356 struct lttng_viewer_stream stream;
357 int ret, i;
358 ssize_t ret_len;
359
360 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
361 cmd.data_size = sizeof(rq);
362 cmd.cmd_version = 0;
363
364 memset(&rq, 0, sizeof(rq));
365 rq.session_id = htobe64(id);
366 // TODO: add cmd line parameter to select seek beginning
367 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
368 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
369
370 do {
371 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
372 } while (ret_len < 0 && errno == EINTR);
373 if (ret_len < 0) {
374 perror("[error] Error sending cmd");
375 ret = ret_len;
376 goto error;
377 }
378 assert(ret_len == sizeof(cmd));
379
380 do {
381 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
382 } while (ret_len < 0 && errno == EINTR);
383 if (ret_len < 0) {
384 perror("[error] Error sending attach request");
385 ret = ret_len;
386 goto error;
387 }
388 assert(ret_len == sizeof(rq));
389
390 do {
391 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
392 } while (ret_len < 0 && errno == EINTR);
393 if (ret_len == 0) {
394 fprintf(stderr, "[error] Remote side has closed connection\n");
395 ret = -1;
396 goto error;
397 }
398 if (ret_len < 0) {
399 perror("[error] Error receiving attach response");
400 ret = ret_len;
401 goto error;
402 }
403 assert(ret_len == sizeof(rp));
404
405 switch(be32toh(rp.status)) {
406 case LTTNG_VIEWER_ATTACH_OK:
407 break;
408 case LTTNG_VIEWER_ATTACH_UNK:
409 ret = -LTTNG_VIEWER_ATTACH_UNK;
410 goto end;
411 case LTTNG_VIEWER_ATTACH_ALREADY:
412 fprintf(stderr, "[error] There is already a viewer attached to this session\n");
413 ret = -1;
414 goto end;
415 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
416 fprintf(stderr, "[error] Not a live session\n");
417 ret = -1;
418 goto end;
419 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
420 fprintf(stderr, "[error] Wrong seek parameter\n");
421 ret = -1;
422 goto end;
423 default:
424 fprintf(stderr, "[error] Unknown attach return code %u\n",
425 be32toh(rp.status));
426 ret = -1;
427 goto end;
428 }
429 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
430 ret = -1;
431 goto end;
432 }
433
434 ctx->session->stream_count += be32toh(rp.streams_count);
435 /*
436 * When the session is created but not started, we do an active wait
437 * until it starts. It allows the viewer to start processing the trace
438 * as soon as the session starts.
439 */
440 if (ctx->session->stream_count == 0) {
441 ret = 0;
442 goto end;
443 }
444 printf_verbose("Waiting for %" PRIu64 " streams:\n",
445 ctx->session->stream_count);
446 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
447 ctx->session->stream_count);
448 for (i = 0; i < be32toh(rp.streams_count); i++) {
449 do {
450 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
451 } while (ret_len < 0 && errno == EINTR);
452 if (ret_len == 0) {
453 fprintf(stderr, "[error] Remote side has closed connection\n");
454 ret = -1;
455 goto error;
456 }
457 if (ret_len < 0) {
458 perror("[error] Error receiving stream");
459 ret = ret_len;
460 goto error;
461 }
462 assert(ret_len == sizeof(stream));
463 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
464 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
465
466 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
467 be64toh(stream.id), stream.path_name,
468 stream.channel_name);
469 ctx->session->streams[i].id = be64toh(stream.id);
470 ctx->session->streams[i].session = ctx->session;
471
472 ctx->session->streams[i].first_read = 1;
473 ctx->session->streams[i].mmap_size = 0;
474
475 if (be32toh(stream.metadata_flag)) {
476 char *path;
477
478 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
479 if (!path) {
480 perror("strdup");
481 ret = -1;
482 goto error;
483 }
484 if (!mkdtemp(path)) {
485 perror("mkdtemp");
486 free(path);
487 ret = -1;
488 goto error;
489 }
490 ctx->session->streams[i].metadata_flag = 1;
491 snprintf(ctx->session->streams[i].path,
492 sizeof(ctx->session->streams[i].path),
493 "%s/%s", path,
494 stream.channel_name);
495 ret = open(ctx->session->streams[i].path,
496 O_WRONLY | O_CREAT | O_TRUNC,
497 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
498 if (ret < 0) {
499 perror("open");
500 free(path);
501 goto error;
502 }
503 ctx->session->streams[i].fd = ret;
504 free(path);
505 }
506 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
507 be64toh(stream.ctf_trace_id));
508 if (ret < 0) {
509 goto error;
510 }
511
512 }
513 ret = 0;
514
515 end:
516 error:
517 return ret;
518 }
519
520 static
521 int ask_new_streams(struct lttng_live_ctx *ctx)
522 {
523 int i, ret = 0;
524 uint64_t id;
525
526 restart:
527 for (i = 0; i < ctx->session_ids->len; i++) {
528 id = g_array_index(ctx->session_ids, uint64_t, i);
529 ret = lttng_live_get_new_streams(ctx, id);
530 printf_verbose("Asking for new streams returns %d\n",
531 ret);
532 if (ret < 0) {
533 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
534 printf_verbose("Session %" PRIu64 " closed\n",
535 id);
536 /*
537 * The streams have already been closed during
538 * the reading, so we only need to get rid of
539 * the trace in our internal table of sessions.
540 */
541 g_array_remove_index(ctx->session_ids, i);
542 /*
543 * We can't continue iterating on the g_array
544 * after a remove, we have to start again.
545 */
546 goto restart;
547 } else {
548 ret = -1;
549 goto end;
550 }
551 }
552 }
553
554 end:
555 return ret;
556 }
557
558 static
559 int get_data_packet(struct lttng_live_ctx *ctx,
560 struct ctf_stream_pos *pos,
561 struct lttng_live_viewer_stream *stream, uint64_t offset,
562 uint64_t len)
563 {
564 struct lttng_viewer_cmd cmd;
565 struct lttng_viewer_get_packet rq;
566 struct lttng_viewer_trace_packet rp;
567 ssize_t ret_len;
568 int ret;
569
570 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
571 cmd.data_size = sizeof(rq);
572 cmd.cmd_version = 0;
573
574 memset(&rq, 0, sizeof(rq));
575 rq.stream_id = htobe64(stream->id);
576 /* Already in big endian. */
577 rq.offset = offset;
578 rq.len = htobe32(len);
579
580 do {
581 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
582 } while (ret_len < 0 && errno == EINTR);
583 if (ret_len < 0) {
584 perror("[error] Error sending cmd");
585 ret = ret_len;
586 goto error;
587 }
588 assert(ret_len == sizeof(cmd));
589
590 do {
591 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
592 } while (ret_len < 0 && errno == EINTR);
593 if (ret_len < 0) {
594 perror("[error] Error sending get_data_packet request");
595 ret = ret_len;
596 goto error;
597 }
598 assert(ret_len == sizeof(rq));
599
600 do {
601 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
602 } while (ret_len < 0 && errno == EINTR);
603 if (ret_len == 0) {
604 fprintf(stderr, "[error] Remote side has closed connection\n");
605 ret = -1;
606 goto error;
607 }
608 if (ret_len < 0) {
609 perror("[error] Error receiving data response");
610 ret = ret_len;
611 goto error;
612 }
613 if (ret_len != sizeof(rp)) {
614 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
615 ", received %" PRId64 "\n", ret_len,
616 sizeof(rp));
617 ret = -1;
618 goto error;
619 }
620
621 rp.flags = be32toh(rp.flags);
622
623 switch (be32toh(rp.status)) {
624 case LTTNG_VIEWER_GET_PACKET_OK:
625 len = be32toh(rp.len);
626 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
627 "\n", len);
628 break;
629 case LTTNG_VIEWER_GET_PACKET_RETRY:
630 printf_verbose("get_data_packet: retry\n");
631 ret = -1;
632 goto end;
633 case LTTNG_VIEWER_GET_PACKET_ERR:
634 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
635 printf_verbose("get_data_packet: new metadata needed\n");
636 ret = 0;
637 goto end;
638 }
639 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
640 ret = ask_new_streams(ctx);
641 if (ret < 0)
642 goto error;
643 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
644 ctx->bt_ctx);
645 }
646 fprintf(stderr, "[error] get_data_packet: error\n");
647 ret = -1;
648 goto end;
649 case LTTNG_VIEWER_GET_PACKET_EOF:
650 ret = -2;
651 goto error;
652 default:
653 printf_verbose("get_data_packet: unknown\n");
654 assert(0);
655 ret = -1;
656 goto end;
657 }
658
659 if (len <= 0) {
660 ret = -1;
661 goto end;
662 }
663
664 if (len > stream->mmap_size) {
665 uint64_t new_size;
666
667 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
668 if (pos->base_mma) {
669 /* unmap old base */
670 ret = munmap_align(pos->base_mma);
671 if (ret) {
672 perror("[error] Unable to unmap old base");
673 ret = -1;
674 goto error;
675 }
676 pos->base_mma = NULL;
677 }
678 pos->base_mma = mmap_align(new_size,
679 PROT_READ | PROT_WRITE,
680 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
681 if (pos->base_mma == MAP_FAILED) {
682 perror("[error] mmap error");
683 pos->base_mma = NULL;
684 ret = -1;
685 goto error;
686 }
687
688 stream->mmap_size = new_size;
689 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
690 stream->mmap_size);
691 }
692
693 do {
694 ret_len = recv(ctx->control_sock,
695 mmap_align_addr(pos->base_mma), len,
696 MSG_WAITALL);
697 } while (ret_len < 0 && errno == EINTR);
698 if (ret_len == 0) {
699 fprintf(stderr, "[error] Remote side has closed connection\n");
700 ret = -1;
701 goto error;
702 }
703 if (ret_len < 0) {
704 perror("[error] Error receiving trace packet");
705 ret = ret_len;
706 goto error;
707 }
708 assert(ret_len == len);
709
710 end:
711 error:
712 return ret;
713 }
714
715 static
716 int get_one_metadata_packet(struct lttng_live_ctx *ctx,
717 struct lttng_live_viewer_stream *metadata_stream)
718 {
719 uint64_t len = 0;
720 int ret;
721 struct lttng_viewer_cmd cmd;
722 struct lttng_viewer_get_metadata rq;
723 struct lttng_viewer_metadata_packet rp;
724 char *data = NULL;
725 ssize_t ret_len;
726
727 rq.stream_id = htobe64(metadata_stream->id);
728 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
729 cmd.data_size = sizeof(rq);
730 cmd.cmd_version = 0;
731
732 do {
733 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
734 } while (ret_len < 0 && errno == EINTR);
735 if (ret_len < 0) {
736 perror("[error] Error sending cmd");
737 ret = ret_len;
738 goto error;
739 }
740 assert(ret_len == sizeof(cmd));
741
742 do {
743 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
744 } while (ret_len < 0 && errno == EINTR);
745 if (ret_len < 0) {
746 perror("[error] Error sending get_metadata request");
747 ret = ret_len;
748 goto error;
749 }
750 assert(ret_len == sizeof(rq));
751
752 do {
753 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
754 } while (ret_len < 0 && errno == EINTR);
755 if (ret_len == 0) {
756 fprintf(stderr, "[error] Remote side has closed connection\n");
757 ret = -1;
758 goto error;
759 }
760 if (ret_len < 0) {
761 perror("[error] Error receiving metadata response");
762 ret = ret_len;
763 goto error;
764 }
765 assert(ret_len == sizeof(rp));
766
767 switch (be32toh(rp.status)) {
768 case LTTNG_VIEWER_METADATA_OK:
769 printf_verbose("get_metadata : OK\n");
770 break;
771 case LTTNG_VIEWER_NO_NEW_METADATA:
772 printf_verbose("get_metadata : NO NEW\n");
773 ret = 0;
774 goto end;
775 case LTTNG_VIEWER_METADATA_ERR:
776 printf_verbose("get_metadata : ERR\n");
777 ret = -1;
778 goto end;
779 default:
780 printf_verbose("get_metadata : UNKNOWN\n");
781 ret = -1;
782 goto end;
783 }
784
785 len = be64toh(rp.len);
786 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
787 if (len <= 0) {
788 ret = -1;
789 goto end;
790 }
791
792 data = zmalloc(len);
793 if (!data) {
794 perror("relay data zmalloc");
795 ret = -1;
796 goto error;
797 }
798 do {
799 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
800 } while (ret_len < 0 && errno == EINTR);
801 if (ret_len == 0) {
802 fprintf(stderr, "[error] Remote side has closed connection\n");
803 ret = -1;
804 free(data);
805 goto error;
806 }
807 if (ret_len < 0) {
808 perror("[error] Error receiving trace packet");
809 ret = ret_len;
810 free(data);
811 goto error;
812 }
813 assert(ret_len == len);
814
815 do {
816 ret_len = write(metadata_stream->fd, data, len);
817 } while (ret_len < 0 && errno == EINTR);
818 if (ret_len < 0) {
819 free(data);
820 ret = ret_len;
821 goto error;
822 }
823 assert(ret_len == len);
824 ret = len;
825
826 free(data);
827
828 end:
829 error:
830 return ret;
831 }
832
833 /*
834 * Return 0 on success, a negative value on error.
835 */
836 static
837 int get_new_metadata(struct lttng_live_ctx *ctx,
838 struct lttng_live_viewer_stream *viewer_stream)
839 {
840 int ret = 0;
841 struct lttng_live_viewer_stream *metadata_stream;
842
843 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
844
845 do {
846 /*
847 * get_one_metadata_packet returns the number of bytes
848 * received, 0 when we have received everything, a
849 * negative value on error.
850 */
851 ret = get_one_metadata_packet(ctx, metadata_stream);
852 } while (ret > 0);
853
854 return ret;
855 }
856
857 /*
858 * Get one index for a stream.
859 *
860 * Returns 0 on success or a negative value on error.
861 */
862 static
863 int get_next_index(struct lttng_live_ctx *ctx,
864 struct lttng_live_viewer_stream *viewer_stream,
865 struct packet_index *index)
866 {
867 struct lttng_viewer_cmd cmd;
868 struct lttng_viewer_get_next_index rq;
869 struct lttng_viewer_index rp;
870 int ret;
871 ssize_t ret_len;
872
873 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
874 cmd.data_size = sizeof(rq);
875 cmd.cmd_version = 0;
876
877 memset(&rq, 0, sizeof(rq));
878 rq.stream_id = htobe64(viewer_stream->id);
879
880 retry:
881 do {
882 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
883 } while (ret_len < 0 && errno == EINTR);
884 if (ret_len < 0) {
885 perror("[error] Error sending cmd");
886 ret = ret_len;
887 goto error;
888 }
889 assert(ret_len == sizeof(cmd));
890
891 do {
892 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
893 } while (ret_len < 0 && errno == EINTR);
894 if (ret_len < 0) {
895 perror("[error] Error sending get_next_index request");
896 ret = ret_len;
897 goto error;
898 }
899 assert(ret_len == sizeof(rq));
900
901 do {
902 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
903 } while (ret_len < 0 && errno == EINTR);
904 if (ret_len == 0) {
905 fprintf(stderr, "[error] Remote side has closed connection\n");
906 ret = -1;
907 goto error;
908 }
909 if (ret_len < 0) {
910 perror("[error] Error receiving index response");
911 ret = ret_len;
912 goto error;
913 }
914 assert(ret_len == sizeof(rp));
915
916 rp.flags = be32toh(rp.flags);
917
918 switch (be32toh(rp.status)) {
919 case LTTNG_VIEWER_INDEX_INACTIVE:
920 printf_verbose("get_next_index: inactive\n");
921 memset(index, 0, sizeof(struct packet_index));
922 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
923 break;
924 case LTTNG_VIEWER_INDEX_OK:
925 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
926 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
927 index->offset = be64toh(rp.offset);
928 index->packet_size = be64toh(rp.packet_size);
929 index->content_size = be64toh(rp.content_size);
930 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
931 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
932 index->events_discarded = be64toh(rp.events_discarded);
933
934 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
935 printf_verbose("get_next_index: new metadata needed\n");
936 ret = get_new_metadata(ctx, viewer_stream);
937 if (ret < 0) {
938 goto error;
939 }
940 }
941 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
942 ret = ask_new_streams(ctx);
943 if (ret < 0)
944 goto error;
945 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
946 ctx->bt_ctx);
947 }
948 break;
949 case LTTNG_VIEWER_INDEX_RETRY:
950 printf_verbose("get_next_index: retry\n");
951 sleep(1);
952 goto retry;
953 case LTTNG_VIEWER_INDEX_HUP:
954 printf_verbose("get_next_index: stream hung up\n");
955 viewer_stream->id = -1ULL;
956 viewer_stream->fd = -1;
957 index->offset = EOF;
958 ctx->session->stream_count--;
959 break;
960 case LTTNG_VIEWER_INDEX_ERR:
961 fprintf(stderr, "[error] get_next_index: error\n");
962 ret = -1;
963 goto error;
964 default:
965 fprintf(stderr, "[error] get_next_index: unkwown value\n");
966 ret = -1;
967 goto error;
968 }
969
970 ret = 0;
971
972 error:
973 return ret;
974 }
975
976 static
977 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
978 int whence)
979 {
980 struct ctf_stream_pos *pos;
981 struct ctf_file_stream *file_stream;
982 struct packet_index *prev_index = NULL, *cur_index;
983 struct lttng_live_viewer_stream *viewer_stream;
984 struct lttng_live_session *session;
985 int ret;
986
987 retry:
988 pos = ctf_pos(stream_pos);
989 file_stream = container_of(pos, struct ctf_file_stream, pos);
990 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
991 session = viewer_stream->session;
992
993 switch (pos->packet_index->len) {
994 case 0:
995 g_array_set_size(pos->packet_index, 1);
996 cur_index = &g_array_index(pos->packet_index,
997 struct packet_index, 0);
998 break;
999 case 1:
1000 g_array_set_size(pos->packet_index, 2);
1001 prev_index = &g_array_index(pos->packet_index,
1002 struct packet_index, 0);
1003 cur_index = &g_array_index(pos->packet_index,
1004 struct packet_index, 1);
1005 break;
1006 case 2:
1007 g_array_index(pos->packet_index,
1008 struct packet_index, 0) =
1009 g_array_index(pos->packet_index,
1010 struct packet_index, 1);
1011 prev_index = &g_array_index(pos->packet_index,
1012 struct packet_index, 0);
1013 cur_index = &g_array_index(pos->packet_index,
1014 struct packet_index, 1);
1015 break;
1016 default:
1017 abort();
1018 break;
1019 }
1020 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
1021 ret = get_next_index(session->ctx, viewer_stream, cur_index);
1022 if (ret < 0) {
1023 pos->offset = EOF;
1024 fprintf(stderr, "[error] get_next_index failed\n");
1025 return;
1026 }
1027
1028 pos->packet_size = cur_index->packet_size;
1029 pos->content_size = cur_index->content_size;
1030 pos->mmap_base_offset = 0;
1031 if (cur_index->offset == EOF) {
1032 pos->offset = EOF;
1033 } else {
1034 pos->offset = 0;
1035 }
1036
1037 if (cur_index->content_size == 0) {
1038 file_stream->parent.cycles_timestamp =
1039 cur_index->ts_cycles.timestamp_end;
1040 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
1041 &file_stream->parent,
1042 cur_index->ts_cycles.timestamp_end);
1043 } else {
1044 /* Convert the timestamps and append to the real_index. */
1045 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1046 &file_stream->parent,
1047 cur_index->ts_cycles.timestamp_begin);
1048 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1049 &file_stream->parent,
1050 cur_index->ts_cycles.timestamp_end);
1051
1052 ctf_update_current_packet_index(&file_stream->parent,
1053 prev_index, cur_index);
1054
1055 file_stream->parent.cycles_timestamp =
1056 cur_index->ts_cycles.timestamp_begin;
1057 file_stream->parent.real_timestamp =
1058 cur_index->ts_real.timestamp_begin;
1059 }
1060
1061 if (pos->packet_size == 0 || pos->offset == EOF) {
1062 goto end;
1063 }
1064
1065 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1066 viewer_stream->id);
1067 ret = get_data_packet(session->ctx, pos, viewer_stream,
1068 be64toh(cur_index->offset),
1069 cur_index->packet_size / CHAR_BIT);
1070 if (ret == -2) {
1071 goto retry;
1072 } else if (ret < 0) {
1073 pos->offset = EOF;
1074 fprintf(stderr, "[error] get_data_packet failed\n");
1075 return;
1076 }
1077
1078 printf_verbose("Index received : packet_size : %" PRIu64
1079 ", offset %" PRIu64 ", content_size %" PRIu64
1080 ", timestamp_end : %" PRIu64 "\n",
1081 cur_index->packet_size, cur_index->offset,
1082 cur_index->content_size,
1083 cur_index->ts_cycles.timestamp_end);
1084
1085 /* update trace_packet_header and stream_packet_context */
1086 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1087 /* Read packet header */
1088 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1089 if (ret) {
1090 pos->offset = EOF;
1091 fprintf(stderr, "[error] trace packet header read failed\n");
1092 goto end;
1093 }
1094 }
1095 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1096 /* Read packet context */
1097 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1098 if (ret) {
1099 pos->offset = EOF;
1100 fprintf(stderr, "[error] stream packet context read failed\n");
1101 goto end;
1102 }
1103 }
1104 pos->data_offset = pos->offset;
1105
1106 end:
1107 return;
1108 }
1109
1110 int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1111 {
1112 struct lttng_viewer_cmd cmd;
1113 struct lttng_viewer_create_session_response resp;
1114 int ret;
1115 ssize_t ret_len;
1116
1117 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1118 cmd.data_size = 0;
1119 cmd.cmd_version = 0;
1120
1121 do {
1122 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1123 } while (ret_len < 0 && errno == EINTR);
1124 if (ret_len < 0) {
1125 perror("[error] Error sending cmd");
1126 ret = ret_len;
1127 goto error;
1128 }
1129 assert(ret_len == sizeof(cmd));
1130
1131 do {
1132 ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
1133 } while (ret_len < 0 && errno == EINTR);
1134 if (ret_len == 0) {
1135 fprintf(stderr, "[error] Remote side has closed connection\n");
1136 ret = -1;
1137 goto error;
1138 }
1139 if (ret_len < 0) {
1140 perror("[error] Error receiving create session reply");
1141 ret = ret_len;
1142 goto error;
1143 }
1144 assert(ret_len == sizeof(resp));
1145
1146 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1147 fprintf(stderr, "[error] Error creating viewer session\n");
1148 ret = -1;
1149 goto error;
1150 }
1151 ret = 0;
1152
1153 error:
1154 return ret;
1155 }
1156
1157 static
1158 int del_traces(gpointer key, gpointer value, gpointer user_data)
1159 {
1160 struct bt_context *bt_ctx = user_data;
1161 struct lttng_live_ctf_trace *trace = value;
1162 int ret;
1163
1164 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1165 if (ret < 0)
1166 fprintf(stderr, "[error] removing trace from context\n");
1167
1168 /* remove the key/value pair from the HT. */
1169 return 1;
1170 }
1171
1172 static
1173 void add_traces(gpointer key, gpointer value, gpointer user_data)
1174 {
1175 int i, ret;
1176 struct bt_context *bt_ctx = user_data;
1177 struct lttng_live_ctf_trace *trace = value;
1178 struct lttng_live_viewer_stream *stream;
1179 struct bt_mmap_stream *new_mmap_stream;
1180 struct bt_mmap_stream_list mmap_list;
1181 struct lttng_live_ctx *ctx = NULL;
1182
1183 /*
1184 * We don't know how many streams we will receive for a trace, so
1185 * once we are done receiving the traces, we add all the traces
1186 * received to the bt_context.
1187 * We can receive streams during the attach command or the
1188 * get_new_streams, so we have to make sure not to add multiple
1189 * times the same traces.
1190 * If a trace is already in the context, we just skip this function.
1191 */
1192 if (trace->in_use)
1193 return;
1194
1195 BT_INIT_LIST_HEAD(&mmap_list.head);
1196
1197 for (i = 0; i < trace->streams->len; i++) {
1198 stream = g_ptr_array_index(trace->streams, i);
1199 ctx = stream->session->ctx;
1200
1201 if (!stream->metadata_flag) {
1202 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1203 new_mmap_stream->priv = (void *) stream;
1204 new_mmap_stream->fd = -1;
1205 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1206 } else {
1207 /* Get all possible metadata before starting */
1208 ret = get_new_metadata(ctx, stream);
1209 if (ret)
1210 goto end_free;
1211 trace->metadata_fp = fopen(stream->path, "r");
1212 }
1213 }
1214
1215 if (!trace->metadata_fp) {
1216 fprintf(stderr, "[error] No metadata stream opened\n");
1217 goto end_free;
1218 }
1219
1220 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1221 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1222 if (ret < 0) {
1223 fprintf(stderr, "[error] Error adding trace\n");
1224 goto end_free;
1225 }
1226
1227 if (bt_ctx->current_iterator) {
1228 struct bt_trace_descriptor *td;
1229 struct bt_trace_handle *handle;
1230
1231 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1232 bt_ctx->trace_handles,
1233 (gpointer) (unsigned long) ret);
1234 td = handle->td;
1235 bt_iter_add_trace(bt_ctx->current_iterator, td);
1236 }
1237
1238 trace->trace_id = ret;
1239 trace->in_use = 1;
1240
1241 goto end;
1242
1243 end_free:
1244 bt_context_put(bt_ctx);
1245 end:
1246 return;
1247 }
1248
1249 int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
1250 {
1251 struct lttng_viewer_cmd cmd;
1252 struct lttng_viewer_new_streams_request rq;
1253 struct lttng_viewer_new_streams_response rp;
1254 struct lttng_viewer_stream stream;
1255 int ret, i;
1256 ssize_t ret_len;
1257 uint32_t stream_count;
1258
1259 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1260 cmd.data_size = sizeof(rq);
1261 cmd.cmd_version = 0;
1262
1263 memset(&rq, 0, sizeof(rq));
1264 rq.session_id = htobe64(id);
1265
1266 do {
1267 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1268 } while (ret_len < 0 && errno == EINTR);
1269 if (ret_len < 0) {
1270 perror("[error] Error sending cmd");
1271 ret = ret_len;
1272 goto error;
1273 }
1274 assert(ret_len == sizeof(cmd));
1275
1276 do {
1277 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
1278 } while (ret_len < 0 && errno == EINTR);
1279 if (ret_len < 0) {
1280 perror("[error] Error sending get_new_streams request");
1281 ret = ret_len;
1282 goto error;
1283 }
1284 assert(ret_len == sizeof(rq));
1285
1286 do {
1287 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
1288 } while (ret_len < 0 && errno == EINTR);
1289 if (ret_len == 0) {
1290 fprintf(stderr, "[error] Remote side has closed connection\n");
1291 ret = -1;
1292 goto error;
1293 }
1294 if (ret_len < 0) {
1295 perror("[error] Error receiving get_new_streams response");
1296 ret = ret_len;
1297 goto error;
1298 }
1299 assert(ret_len == sizeof(rp));
1300
1301 switch(be32toh(rp.status)) {
1302 case LTTNG_VIEWER_NEW_STREAMS_OK:
1303 break;
1304 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1305 ret = 0;
1306 goto end;
1307 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1308 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1309 goto end;
1310 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1311 fprintf(stderr, "[error] get_new_streams error\n");
1312 ret = -1;
1313 goto end;
1314 default:
1315 fprintf(stderr, "[error] Unknown return code %u\n",
1316 be32toh(rp.status));
1317 ret = -1;
1318 goto end;
1319 }
1320
1321 stream_count = be32toh(rp.streams_count);
1322 ctx->session->stream_count += stream_count;
1323 /*
1324 * When the session is created but not started, we do an active wait
1325 * until it starts. It allows the viewer to start processing the trace
1326 * as soon as the session starts.
1327 */
1328 if (ctx->session->stream_count == 0) {
1329 ret = 0;
1330 goto end;
1331 }
1332 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1333 ctx->session->stream_count);
1334 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1335 ctx->session->stream_count);
1336 for (i = 0; i < stream_count; i++) {
1337 do {
1338 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
1339 } while (ret_len < 0 && errno == EINTR);
1340 if (ret_len == 0) {
1341 fprintf(stderr, "[error] Remote side has closed connection\n");
1342 ret = -1;
1343 goto error;
1344 }
1345 if (ret_len < 0) {
1346 perror("[error] Error receiving stream");
1347 ret = ret_len;
1348 goto error;
1349 }
1350 assert(ret_len == sizeof(stream));
1351 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1352 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1353
1354 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1355 be64toh(stream.id), stream.path_name,
1356 stream.channel_name);
1357 ctx->session->streams[i].id = be64toh(stream.id);
1358 ctx->session->streams[i].session = ctx->session;
1359
1360 ctx->session->streams[i].first_read = 1;
1361 ctx->session->streams[i].mmap_size = 0;
1362
1363 if (be32toh(stream.metadata_flag)) {
1364 char *path;
1365
1366 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
1367 if (!path) {
1368 perror("strdup");
1369 ret = -1;
1370 goto error;
1371 }
1372 if (!mkdtemp(path)) {
1373 perror("mkdtemp");
1374 free(path);
1375 ret = -1;
1376 goto error;
1377 }
1378 ctx->session->streams[i].metadata_flag = 1;
1379 snprintf(ctx->session->streams[i].path,
1380 sizeof(ctx->session->streams[i].path),
1381 "%s/%s", path,
1382 stream.channel_name);
1383 ret = open(ctx->session->streams[i].path,
1384 O_WRONLY | O_CREAT | O_TRUNC,
1385 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1386 if (ret < 0) {
1387 perror("open");
1388 free(path);
1389 goto error;
1390 }
1391 ctx->session->streams[i].fd = ret;
1392 free(path);
1393 }
1394 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1395 be64toh(stream.ctf_trace_id));
1396 if (ret < 0) {
1397 goto error;
1398 }
1399
1400 }
1401 ret = 0;
1402
1403 end:
1404 error:
1405 return ret;
1406 }
1407
1408 void lttng_live_read(struct lttng_live_ctx *ctx)
1409 {
1410 int ret, i;
1411 struct bt_ctf_iter *iter;
1412 const struct bt_ctf_event *event;
1413 struct bt_iter_pos begin_pos;
1414 struct bt_trace_descriptor *td_write;
1415 struct bt_format *fmt_write;
1416 struct ctf_text_stream_pos *sout;
1417 uint64_t id;
1418
1419 ctx->bt_ctx = bt_context_create();
1420 if (!ctx->bt_ctx) {
1421 fprintf(stderr, "[error] bt_context_create allocation\n");
1422 goto end;
1423 }
1424
1425 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1426 if (!fmt_write) {
1427 fprintf(stderr, "[error] ctf-text error\n");
1428 goto end;
1429 }
1430
1431 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1432 if (!td_write) {
1433 fprintf(stderr, "[error] Error opening output trace\n");
1434 goto end_free;
1435 }
1436
1437 sout = container_of(td_write, struct ctf_text_stream_pos,
1438 trace_descriptor);
1439 if (!sout->parent.event_cb)
1440 goto end_free;
1441
1442 ret = lttng_live_create_viewer_session(ctx);
1443 if (ret < 0) {
1444 goto end_free;
1445 }
1446
1447 for (i = 0; i < ctx->session_ids->len; i++) {
1448 id = g_array_index(ctx->session_ids, uint64_t, i);
1449 printf_verbose("Attaching to session %lu\n", id);
1450 ret = lttng_live_attach_session(ctx, id);
1451 printf_verbose("Attaching session returns %d\n", ret);
1452 if (ret < 0) {
1453 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1454 fprintf(stderr, "[error] Unknown session ID\n");
1455 }
1456 goto end_free;
1457 }
1458 }
1459
1460 /*
1461 * As long as the session is active, we try to get new streams.
1462 */
1463 for (;;) {
1464 int flags;
1465
1466 while (!ctx->session->stream_count) {
1467 if (ctx->session_ids->len == 0)
1468 goto end_free;
1469 ret = ask_new_streams(ctx);
1470 if (ret < 0)
1471 goto end_free;
1472 }
1473
1474 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1475 ctx->bt_ctx);
1476
1477 begin_pos.type = BT_SEEK_BEGIN;
1478 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
1479 if (!iter) {
1480 fprintf(stderr, "[error] Iterator creation error\n");
1481 goto end;
1482 }
1483 for (;;) {
1484 event = bt_ctf_iter_read_event_flags(iter, &flags);
1485 if (!(flags & BT_ITER_FLAG_RETRY)) {
1486 if (!event) {
1487 /* End of trace */
1488 break;
1489 }
1490 ret = sout->parent.event_cb(&sout->parent,
1491 event->parent->stream);
1492 if (ret) {
1493 fprintf(stderr, "[error] Writing "
1494 "event failed.\n");
1495 goto end_free;
1496 }
1497 }
1498 ret = bt_iter_next(bt_ctf_get_iter(iter));
1499 if (ret < 0) {
1500 goto end_free;
1501 }
1502 }
1503 bt_ctf_iter_destroy(iter);
1504 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1505 del_traces, ctx->bt_ctx);
1506 ctx->session->stream_count = 0;
1507 }
1508
1509 end_free:
1510 bt_context_put(ctx->bt_ctx);
1511 end:
1512 return;
1513 }
This page took 0.0938 seconds and 5 git commands to generate.