Fix: assign a trace handle to every live trace
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include <babeltrace/ctf/ctf-index.h>
38
39 #include <babeltrace/babeltrace.h>
40 #include <babeltrace/ctf/events.h>
41 #include <babeltrace/ctf/callbacks.h>
42 #include <babeltrace/ctf/iterator.h>
43
44 /* for packet_index */
45 #include <babeltrace/ctf/types.h>
46
47 #include <babeltrace/ctf/metadata.h>
48 #include <babeltrace/ctf-text/types.h>
49 #include <babeltrace/ctf/events-internal.h>
50 #include <formats/ctf/events-private.h>
51
52 #include "lttng-live.h"
53 #include "lttng-viewer-abi.h"
54
55 /*
56 * Memory allocation zeroed
57 */
58 #define zmalloc(x) calloc(1, x)
59
60 #ifndef max_t
61 #define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63 #endif
64
65 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
66 size_t index, int whence);
67 static void add_traces(gpointer key, gpointer value, gpointer user_data);
68 static int del_traces(gpointer key, gpointer value, gpointer user_data);
69 static int get_new_metadata(struct lttng_live_ctx *ctx,
70 struct lttng_live_viewer_stream *viewer_stream);
71
72 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
73 {
74 struct hostent *host;
75 struct sockaddr_in server_addr;
76 int ret;
77
78 host = gethostbyname(ctx->relay_hostname);
79 if (!host) {
80 ret = -1;
81 goto end;
82 }
83
84 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
85 perror("Socket");
86 ret = -1;
87 goto end;
88 }
89
90 server_addr.sin_family = AF_INET;
91 server_addr.sin_port = htons(ctx->port);
92 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
93 bzero(&(server_addr.sin_zero), 8);
94
95 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
96 sizeof(struct sockaddr)) == -1) {
97 perror("Connect");
98 ret = -1;
99 goto end;
100 }
101
102 ret = 0;
103
104 end:
105 return ret;
106 }
107
108 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
109 {
110 struct lttng_viewer_cmd cmd;
111 struct lttng_viewer_connect connect;
112 int ret;
113 ssize_t ret_len;
114
115 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
116 cmd.data_size = sizeof(connect);
117 cmd.cmd_version = 0;
118
119 connect.viewer_session_id = -1ULL; /* will be set on recv */
120 connect.major = htobe32(LTTNG_LIVE_MAJOR);
121 connect.minor = htobe32(LTTNG_LIVE_MINOR);
122 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
123
124 do {
125 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
126 } while (ret_len < 0 && errno == EINTR);
127 if (ret_len < 0) {
128 perror("[error] Error sending cmd");
129 ret = ret_len;
130 goto error;
131 }
132 assert(ret_len == sizeof(cmd));
133
134 do {
135 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
136 } while (ret_len < 0 && errno == EINTR);
137 if (ret_len < 0) {
138 perror("[error] Error sending version");
139 ret = ret_len;
140 goto error;
141 }
142 assert(ret_len == sizeof(connect));
143
144 do {
145 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
146 } while (ret_len < 0 && errno == EINTR);
147 if (ret_len == 0) {
148 fprintf(stderr, "[error] Remote side has closed connection\n");
149 ret = -1;
150 goto error;
151 }
152 if (ret_len < 0) {
153 perror("[error] Error receiving version");
154 ret = ret_len;
155 goto error;
156 }
157 assert(ret_len == sizeof(connect));
158
159 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
160 be64toh(connect.viewer_session_id));
161 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
162 be32toh(connect.minor));
163
164 ret = 0;
165
166 error:
167 return ret;
168 }
169
170 static
171 void free_session_list(GPtrArray *session_list)
172 {
173 int i;
174 struct lttng_live_relay_session *relay_session;
175
176 for (i = 0; i < session_list->len; i++) {
177 relay_session = g_ptr_array_index(session_list, i);
178 free(relay_session->name);
179 free(relay_session->hostname);
180 }
181 g_ptr_array_free(session_list, TRUE);
182 }
183
184 static
185 void print_session_list(GPtrArray *session_list, const char *path)
186 {
187 int i;
188 struct lttng_live_relay_session *relay_session;
189
190 for (i = 0; i < session_list->len; i++) {
191 relay_session = g_ptr_array_index(session_list, i);
192 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
193 "%u stream(s), %u client(s) connected)\n",
194 path, relay_session->hostname,
195 relay_session->name, relay_session->timer,
196 relay_session->streams, relay_session->clients);
197 }
198 }
199
200 static
201 void update_session_list(GPtrArray *session_list, char *hostname,
202 char *session_name, uint32_t streams, uint32_t clients,
203 uint32_t timer)
204 {
205 int i, found = 0;
206 struct lttng_live_relay_session *relay_session;
207
208 for (i = 0; i < session_list->len; i++) {
209 relay_session = g_ptr_array_index(session_list, i);
210 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
211 strncmp(relay_session->name,
212 session_name, NAME_MAX) == 0) {
213 relay_session->streams += streams;
214 if (relay_session->clients < clients)
215 relay_session->clients = clients;
216 found = 1;
217 break;
218 }
219 }
220 if (found)
221 return;
222
223 relay_session = g_new0(struct lttng_live_relay_session, 1);
224 relay_session->hostname = strndup(hostname, NAME_MAX);
225 relay_session->name = strndup(session_name, NAME_MAX);
226 relay_session->clients = clients;
227 relay_session->streams = streams;
228 relay_session->timer = timer;
229 g_ptr_array_add(session_list, relay_session);
230 }
231
232 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
233 {
234 struct lttng_viewer_cmd cmd;
235 struct lttng_viewer_list_sessions list;
236 struct lttng_viewer_session lsession;
237 int i, ret, sessions_count, print_list = 0;
238 ssize_t ret_len;
239 uint64_t session_id;
240 GPtrArray *session_list = NULL;
241
242 if (strlen(ctx->session_name) == 0) {
243 print_list = 1;
244 session_list = g_ptr_array_new();
245 }
246
247 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
248 cmd.data_size = 0;
249 cmd.cmd_version = 0;
250
251 do {
252 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
253 } while (ret_len < 0 && errno == EINTR);
254 if (ret_len < 0) {
255 perror("[error] Error sending cmd");
256 ret = ret_len;
257 goto error;
258 }
259 assert(ret_len == sizeof(cmd));
260
261 do {
262 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
263 } while (ret_len < 0 && errno == EINTR);
264 if (ret_len == 0) {
265 fprintf(stderr, "[error] Remote side has closed connection\n");
266 ret = -1;
267 goto error;
268 }
269 if (ret_len < 0) {
270 perror("[error] Error receiving session list");
271 ret = ret_len;
272 goto error;
273 }
274 assert(ret_len == sizeof(list));
275
276 sessions_count = be32toh(list.sessions_count);
277 for (i = 0; i < sessions_count; i++) {
278 do {
279 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
280 } while (ret_len < 0 && errno == EINTR);
281 if (ret_len == 0) {
282 fprintf(stderr, "[error] Remote side has closed connection\n");
283 ret = -1;
284 goto error;
285 }
286 if (ret_len < 0) {
287 perror("[error] Error receiving session");
288 ret = ret_len;
289 goto error;
290 }
291 assert(ret_len == sizeof(lsession));
292 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
293 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
294 session_id = be64toh(lsession.id);
295
296 if (print_list) {
297 update_session_list(session_list,
298 lsession.hostname,
299 lsession.session_name,
300 be32toh(lsession.streams),
301 be32toh(lsession.clients),
302 be32toh(lsession.live_timer));
303 } else {
304 if ((strncmp(lsession.session_name, ctx->session_name,
305 NAME_MAX) == 0) && (strncmp(lsession.hostname,
306 ctx->traced_hostname, NAME_MAX) == 0)) {
307 printf_verbose("Reading from session %" PRIu64 "\n",
308 session_id);
309 g_array_append_val(ctx->session_ids,
310 session_id);
311 }
312 }
313 }
314
315 if (print_list) {
316 print_session_list(session_list, path);
317 free_session_list(session_list);
318 }
319
320 ret = 0;
321
322 error:
323 return ret;
324 }
325
326 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
327 uint64_t ctf_trace_id)
328 {
329 struct lttng_live_ctf_trace *trace;
330 int ret = 0;
331
332 trace = g_hash_table_lookup(stream->session->ctf_traces,
333 (gpointer) ctf_trace_id);
334 if (!trace) {
335 trace = g_new0(struct lttng_live_ctf_trace, 1);
336 trace->ctf_trace_id = ctf_trace_id;
337 trace->streams = g_ptr_array_new();
338 g_hash_table_insert(stream->session->ctf_traces,
339 (gpointer) ctf_trace_id,
340 trace);
341 }
342 if (stream->metadata_flag)
343 trace->metadata_stream = stream;
344
345 stream->ctf_trace = trace;
346 g_ptr_array_add(trace->streams, stream);
347
348 return ret;
349 }
350
351 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
352 {
353 struct lttng_viewer_cmd cmd;
354 struct lttng_viewer_attach_session_request rq;
355 struct lttng_viewer_attach_session_response rp;
356 struct lttng_viewer_stream stream;
357 int ret, i;
358 ssize_t ret_len;
359
360 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
361 cmd.data_size = sizeof(rq);
362 cmd.cmd_version = 0;
363
364 memset(&rq, 0, sizeof(rq));
365 rq.session_id = htobe64(id);
366 // TODO: add cmd line parameter to select seek beginning
367 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
368 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
369
370 do {
371 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
372 } while (ret_len < 0 && errno == EINTR);
373 if (ret_len < 0) {
374 perror("[error] Error sending cmd");
375 ret = ret_len;
376 goto error;
377 }
378 assert(ret_len == sizeof(cmd));
379
380 do {
381 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
382 } while (ret_len < 0 && errno == EINTR);
383 if (ret_len < 0) {
384 perror("[error] Error sending attach request");
385 ret = ret_len;
386 goto error;
387 }
388 assert(ret_len == sizeof(rq));
389
390 do {
391 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
392 } while (ret_len < 0 && errno == EINTR);
393 if (ret_len == 0) {
394 fprintf(stderr, "[error] Remote side has closed connection\n");
395 ret = -1;
396 goto error;
397 }
398 if (ret_len < 0) {
399 perror("[error] Error receiving attach response");
400 ret = ret_len;
401 goto error;
402 }
403 assert(ret_len == sizeof(rp));
404
405 switch(be32toh(rp.status)) {
406 case LTTNG_VIEWER_ATTACH_OK:
407 break;
408 case LTTNG_VIEWER_ATTACH_UNK:
409 ret = -LTTNG_VIEWER_ATTACH_UNK;
410 goto end;
411 case LTTNG_VIEWER_ATTACH_ALREADY:
412 fprintf(stderr, "[error] There is already a viewer attached to this session\n");
413 ret = -1;
414 goto end;
415 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
416 fprintf(stderr, "[error] Not a live session\n");
417 ret = -1;
418 goto end;
419 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
420 fprintf(stderr, "[error] Wrong seek parameter\n");
421 ret = -1;
422 goto end;
423 default:
424 fprintf(stderr, "[error] Unknown attach return code %u\n",
425 be32toh(rp.status));
426 ret = -1;
427 goto end;
428 }
429 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
430 ret = -1;
431 goto end;
432 }
433
434 ctx->session->stream_count += be32toh(rp.streams_count);
435 /*
436 * When the session is created but not started, we do an active wait
437 * until it starts. It allows the viewer to start processing the trace
438 * as soon as the session starts.
439 */
440 if (ctx->session->stream_count == 0) {
441 ret = 0;
442 goto end;
443 }
444 printf_verbose("Waiting for %" PRIu64 " streams:\n",
445 ctx->session->stream_count);
446 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
447 ctx->session->stream_count);
448 for (i = 0; i < be32toh(rp.streams_count); i++) {
449 do {
450 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
451 } while (ret_len < 0 && errno == EINTR);
452 if (ret_len == 0) {
453 fprintf(stderr, "[error] Remote side has closed connection\n");
454 ret = -1;
455 goto error;
456 }
457 if (ret_len < 0) {
458 perror("[error] Error receiving stream");
459 ret = ret_len;
460 goto error;
461 }
462 assert(ret_len == sizeof(stream));
463 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
464 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
465
466 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
467 be64toh(stream.id), stream.path_name,
468 stream.channel_name);
469 ctx->session->streams[i].id = be64toh(stream.id);
470 ctx->session->streams[i].session = ctx->session;
471
472 ctx->session->streams[i].first_read = 1;
473 ctx->session->streams[i].mmap_size = 0;
474
475 if (be32toh(stream.metadata_flag)) {
476 char *path;
477
478 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
479 if (!path) {
480 perror("strdup");
481 ret = -1;
482 goto error;
483 }
484 if (!mkdtemp(path)) {
485 perror("mkdtemp");
486 free(path);
487 ret = -1;
488 goto error;
489 }
490 ctx->session->streams[i].metadata_flag = 1;
491 snprintf(ctx->session->streams[i].path,
492 sizeof(ctx->session->streams[i].path),
493 "%s/%s", path,
494 stream.channel_name);
495 ret = open(ctx->session->streams[i].path,
496 O_WRONLY | O_CREAT | O_TRUNC,
497 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
498 if (ret < 0) {
499 perror("open");
500 free(path);
501 goto error;
502 }
503 ctx->session->streams[i].fd = ret;
504 free(path);
505 }
506 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
507 be64toh(stream.ctf_trace_id));
508 if (ret < 0) {
509 goto error;
510 }
511
512 }
513 ret = 0;
514
515 end:
516 error:
517 return ret;
518 }
519
520 static
521 int ask_new_streams(struct lttng_live_ctx *ctx)
522 {
523 int i, ret = 0;
524 uint64_t id;
525
526 restart:
527 for (i = 0; i < ctx->session_ids->len; i++) {
528 id = g_array_index(ctx->session_ids, uint64_t, i);
529 ret = lttng_live_get_new_streams(ctx, id);
530 printf_verbose("Asking for new streams returns %d\n",
531 ret);
532 if (ret < 0) {
533 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
534 printf_verbose("Session %" PRIu64 " closed\n",
535 id);
536 /*
537 * The streams have already been closed during
538 * the reading, so we only need to get rid of
539 * the trace in our internal table of sessions.
540 */
541 g_array_remove_index(ctx->session_ids, i);
542 /*
543 * We can't continue iterating on the g_array
544 * after a remove, we have to start again.
545 */
546 goto restart;
547 } else {
548 ret = -1;
549 goto end;
550 }
551 }
552 }
553
554 end:
555 return ret;
556 }
557
558 static
559 int get_data_packet(struct lttng_live_ctx *ctx,
560 struct ctf_stream_pos *pos,
561 struct lttng_live_viewer_stream *stream, uint64_t offset,
562 uint64_t len)
563 {
564 struct lttng_viewer_cmd cmd;
565 struct lttng_viewer_get_packet rq;
566 struct lttng_viewer_trace_packet rp;
567 ssize_t ret_len;
568 int ret;
569
570 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
571 cmd.data_size = sizeof(rq);
572 cmd.cmd_version = 0;
573
574 memset(&rq, 0, sizeof(rq));
575 rq.stream_id = htobe64(stream->id);
576 /* Already in big endian. */
577 rq.offset = offset;
578 rq.len = htobe32(len);
579
580 do {
581 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
582 } while (ret_len < 0 && errno == EINTR);
583 if (ret_len < 0) {
584 perror("[error] Error sending cmd");
585 ret = ret_len;
586 goto error;
587 }
588 assert(ret_len == sizeof(cmd));
589
590 do {
591 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
592 } while (ret_len < 0 && errno == EINTR);
593 if (ret_len < 0) {
594 perror("[error] Error sending get_data_packet request");
595 ret = ret_len;
596 goto error;
597 }
598 assert(ret_len == sizeof(rq));
599
600 do {
601 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
602 } while (ret_len < 0 && errno == EINTR);
603 if (ret_len == 0) {
604 fprintf(stderr, "[error] Remote side has closed connection\n");
605 ret = -1;
606 goto error;
607 }
608 if (ret_len < 0) {
609 perror("[error] Error receiving data response");
610 ret = ret_len;
611 goto error;
612 }
613 if (ret_len != sizeof(rp)) {
614 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
615 ", received %" PRId64 "\n", ret_len,
616 sizeof(rp));
617 ret = -1;
618 goto error;
619 }
620
621 rp.flags = be32toh(rp.flags);
622
623 switch (be32toh(rp.status)) {
624 case LTTNG_VIEWER_GET_PACKET_OK:
625 len = be32toh(rp.len);
626 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
627 "\n", len);
628 break;
629 case LTTNG_VIEWER_GET_PACKET_RETRY:
630 printf_verbose("get_data_packet: retry\n");
631 ret = -1;
632 goto end;
633 case LTTNG_VIEWER_GET_PACKET_ERR:
634 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
635 printf_verbose("get_data_packet: new metadata needed\n");
636 ret = 0;
637 goto end;
638 }
639 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
640 ret = ask_new_streams(ctx);
641 if (ret < 0)
642 goto error;
643 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
644 ctx->bt_ctx);
645 }
646 fprintf(stderr, "[error] get_data_packet: error\n");
647 ret = -1;
648 goto end;
649 case LTTNG_VIEWER_GET_PACKET_EOF:
650 ret = -2;
651 goto error;
652 default:
653 printf_verbose("get_data_packet: unknown\n");
654 assert(0);
655 ret = -1;
656 goto end;
657 }
658
659 if (len <= 0) {
660 ret = -1;
661 goto end;
662 }
663
664 if (len > stream->mmap_size) {
665 uint64_t new_size;
666
667 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
668 if (pos->base_mma) {
669 /* unmap old base */
670 ret = munmap_align(pos->base_mma);
671 if (ret) {
672 perror("[error] Unable to unmap old base");
673 ret = -1;
674 goto error;
675 }
676 pos->base_mma = NULL;
677 }
678 pos->base_mma = mmap_align(new_size,
679 PROT_READ | PROT_WRITE,
680 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
681 if (pos->base_mma == MAP_FAILED) {
682 perror("[error] mmap error");
683 pos->base_mma = NULL;
684 ret = -1;
685 goto error;
686 }
687
688 stream->mmap_size = new_size;
689 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
690 stream->mmap_size);
691 }
692
693 do {
694 ret_len = recv(ctx->control_sock,
695 mmap_align_addr(pos->base_mma), len,
696 MSG_WAITALL);
697 } while (ret_len < 0 && errno == EINTR);
698 if (ret_len == 0) {
699 fprintf(stderr, "[error] Remote side has closed connection\n");
700 ret = -1;
701 goto error;
702 }
703 if (ret_len < 0) {
704 perror("[error] Error receiving trace packet");
705 ret = ret_len;
706 goto error;
707 }
708 assert(ret_len == len);
709
710 end:
711 error:
712 return ret;
713 }
714
715 static
716 int get_one_metadata_packet(struct lttng_live_ctx *ctx,
717 struct lttng_live_viewer_stream *metadata_stream)
718 {
719 uint64_t len = 0;
720 int ret;
721 struct lttng_viewer_cmd cmd;
722 struct lttng_viewer_get_metadata rq;
723 struct lttng_viewer_metadata_packet rp;
724 char *data = NULL;
725 ssize_t ret_len;
726
727 rq.stream_id = htobe64(metadata_stream->id);
728 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
729 cmd.data_size = sizeof(rq);
730 cmd.cmd_version = 0;
731
732 do {
733 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
734 } while (ret_len < 0 && errno == EINTR);
735 if (ret_len < 0) {
736 perror("[error] Error sending cmd");
737 ret = ret_len;
738 goto error;
739 }
740 assert(ret_len == sizeof(cmd));
741
742 do {
743 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
744 } while (ret_len < 0 && errno == EINTR);
745 if (ret_len < 0) {
746 perror("[error] Error sending get_metadata request");
747 ret = ret_len;
748 goto error;
749 }
750 assert(ret_len == sizeof(rq));
751
752 do {
753 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
754 } while (ret_len < 0 && errno == EINTR);
755 if (ret_len == 0) {
756 fprintf(stderr, "[error] Remote side has closed connection\n");
757 ret = -1;
758 goto error;
759 }
760 if (ret_len < 0) {
761 perror("[error] Error receiving metadata response");
762 ret = ret_len;
763 goto error;
764 }
765 assert(ret_len == sizeof(rp));
766
767 switch (be32toh(rp.status)) {
768 case LTTNG_VIEWER_METADATA_OK:
769 printf_verbose("get_metadata : OK\n");
770 break;
771 case LTTNG_VIEWER_NO_NEW_METADATA:
772 printf_verbose("get_metadata : NO NEW\n");
773 ret = 0;
774 goto end;
775 case LTTNG_VIEWER_METADATA_ERR:
776 printf_verbose("get_metadata : ERR\n");
777 ret = -1;
778 goto end;
779 default:
780 printf_verbose("get_metadata : UNKNOWN\n");
781 ret = -1;
782 goto end;
783 }
784
785 len = be64toh(rp.len);
786 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
787 if (len <= 0) {
788 ret = -1;
789 goto end;
790 }
791
792 data = zmalloc(len);
793 if (!data) {
794 perror("relay data zmalloc");
795 ret = -1;
796 goto error;
797 }
798 do {
799 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
800 } while (ret_len < 0 && errno == EINTR);
801 if (ret_len == 0) {
802 fprintf(stderr, "[error] Remote side has closed connection\n");
803 ret = -1;
804 free(data);
805 goto error;
806 }
807 if (ret_len < 0) {
808 perror("[error] Error receiving trace packet");
809 ret = ret_len;
810 free(data);
811 goto error;
812 }
813 assert(ret_len == len);
814
815 do {
816 ret_len = write(metadata_stream->fd, data, len);
817 } while (ret_len < 0 && errno == EINTR);
818 if (ret_len < 0) {
819 free(data);
820 ret = ret_len;
821 goto error;
822 }
823 assert(ret_len == len);
824 ret = len;
825
826 free(data);
827
828 end:
829 error:
830 return ret;
831 }
832
833 /*
834 * Return 0 on success, a negative value on error.
835 */
836 static
837 int get_new_metadata(struct lttng_live_ctx *ctx,
838 struct lttng_live_viewer_stream *viewer_stream)
839 {
840 int ret = 0;
841 struct lttng_live_viewer_stream *metadata_stream;
842
843 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
844
845 do {
846 /*
847 * get_one_metadata_packet returns the number of bytes
848 * received, 0 when we have received everything, a
849 * negative value on error.
850 */
851 ret = get_one_metadata_packet(ctx, metadata_stream);
852 } while (ret > 0);
853
854 return ret;
855 }
856
857 /*
858 * Get one index for a stream.
859 *
860 * Returns 0 on success or a negative value on error.
861 */
862 static
863 int get_next_index(struct lttng_live_ctx *ctx,
864 struct lttng_live_viewer_stream *viewer_stream,
865 struct packet_index *index)
866 {
867 struct lttng_viewer_cmd cmd;
868 struct lttng_viewer_get_next_index rq;
869 struct lttng_viewer_index rp;
870 int ret;
871 ssize_t ret_len;
872
873 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
874 cmd.data_size = sizeof(rq);
875 cmd.cmd_version = 0;
876
877 memset(&rq, 0, sizeof(rq));
878 rq.stream_id = htobe64(viewer_stream->id);
879
880 retry:
881 do {
882 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
883 } while (ret_len < 0 && errno == EINTR);
884 if (ret_len < 0) {
885 perror("[error] Error sending cmd");
886 ret = ret_len;
887 goto error;
888 }
889 assert(ret_len == sizeof(cmd));
890
891 do {
892 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
893 } while (ret_len < 0 && errno == EINTR);
894 if (ret_len < 0) {
895 perror("[error] Error sending get_next_index request");
896 ret = ret_len;
897 goto error;
898 }
899 assert(ret_len == sizeof(rq));
900
901 do {
902 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
903 } while (ret_len < 0 && errno == EINTR);
904 if (ret_len == 0) {
905 fprintf(stderr, "[error] Remote side has closed connection\n");
906 ret = -1;
907 goto error;
908 }
909 if (ret_len < 0) {
910 perror("[error] Error receiving index response");
911 ret = ret_len;
912 goto error;
913 }
914 assert(ret_len == sizeof(rp));
915
916 rp.flags = be32toh(rp.flags);
917
918 switch (be32toh(rp.status)) {
919 case LTTNG_VIEWER_INDEX_INACTIVE:
920 printf_verbose("get_next_index: inactive\n");
921 memset(index, 0, sizeof(struct packet_index));
922 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
923 break;
924 case LTTNG_VIEWER_INDEX_OK:
925 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
926 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
927 index->offset = be64toh(rp.offset);
928 index->packet_size = be64toh(rp.packet_size);
929 index->content_size = be64toh(rp.content_size);
930 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
931 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
932 index->events_discarded = be64toh(rp.events_discarded);
933
934 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
935 printf_verbose("get_next_index: new metadata needed\n");
936 ret = get_new_metadata(ctx, viewer_stream);
937 if (ret < 0) {
938 goto error;
939 }
940 }
941 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
942 ret = ask_new_streams(ctx);
943 if (ret < 0)
944 goto error;
945 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
946 ctx->bt_ctx);
947 }
948 break;
949 case LTTNG_VIEWER_INDEX_RETRY:
950 printf_verbose("get_next_index: retry\n");
951 sleep(1);
952 goto retry;
953 case LTTNG_VIEWER_INDEX_HUP:
954 printf_verbose("get_next_index: stream hung up\n");
955 viewer_stream->id = -1ULL;
956 viewer_stream->fd = -1;
957 index->offset = EOF;
958 ctx->session->stream_count--;
959 break;
960 case LTTNG_VIEWER_INDEX_ERR:
961 fprintf(stderr, "[error] get_next_index: error\n");
962 ret = -1;
963 goto error;
964 default:
965 fprintf(stderr, "[error] get_next_index: unkwown value\n");
966 ret = -1;
967 goto error;
968 }
969
970 ret = 0;
971
972 error:
973 return ret;
974 }
975
976 static
977 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
978 int whence)
979 {
980 struct ctf_stream_pos *pos;
981 struct ctf_file_stream *file_stream;
982 struct packet_index *prev_index = NULL, *cur_index;
983 struct lttng_live_viewer_stream *viewer_stream;
984 struct lttng_live_session *session;
985 int ret;
986
987 retry:
988 pos = ctf_pos(stream_pos);
989 file_stream = container_of(pos, struct ctf_file_stream, pos);
990 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
991 session = viewer_stream->session;
992
993 switch (pos->packet_index->len) {
994 case 0:
995 g_array_set_size(pos->packet_index, 1);
996 cur_index = &g_array_index(pos->packet_index,
997 struct packet_index, 0);
998 break;
999 case 1:
1000 g_array_set_size(pos->packet_index, 2);
1001 prev_index = &g_array_index(pos->packet_index,
1002 struct packet_index, 0);
1003 cur_index = &g_array_index(pos->packet_index,
1004 struct packet_index, 1);
1005 break;
1006 case 2:
1007 g_array_index(pos->packet_index,
1008 struct packet_index, 0) =
1009 g_array_index(pos->packet_index,
1010 struct packet_index, 1);
1011 prev_index = &g_array_index(pos->packet_index,
1012 struct packet_index, 0);
1013 cur_index = &g_array_index(pos->packet_index,
1014 struct packet_index, 1);
1015 break;
1016 default:
1017 abort();
1018 break;
1019 }
1020 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
1021 ret = get_next_index(session->ctx, viewer_stream, cur_index);
1022 if (ret < 0) {
1023 pos->offset = EOF;
1024 fprintf(stderr, "[error] get_next_index failed\n");
1025 return;
1026 }
1027
1028 pos->packet_size = cur_index->packet_size;
1029 pos->content_size = cur_index->content_size;
1030 pos->mmap_base_offset = 0;
1031 if (cur_index->offset == EOF) {
1032 pos->offset = EOF;
1033 } else {
1034 pos->offset = 0;
1035 }
1036
1037 if (cur_index->content_size == 0) {
1038 file_stream->parent.cycles_timestamp =
1039 cur_index->ts_cycles.timestamp_end;
1040 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
1041 &file_stream->parent,
1042 cur_index->ts_cycles.timestamp_end);
1043 } else {
1044 /* Convert the timestamps and append to the real_index. */
1045 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1046 &file_stream->parent,
1047 cur_index->ts_cycles.timestamp_begin);
1048 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1049 &file_stream->parent,
1050 cur_index->ts_cycles.timestamp_end);
1051
1052 ctf_update_current_packet_index(&file_stream->parent,
1053 prev_index, cur_index);
1054
1055 file_stream->parent.cycles_timestamp =
1056 cur_index->ts_cycles.timestamp_begin;
1057 file_stream->parent.real_timestamp =
1058 cur_index->ts_real.timestamp_begin;
1059 }
1060
1061 if (pos->packet_size == 0 || pos->offset == EOF) {
1062 goto end;
1063 }
1064
1065 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1066 viewer_stream->id);
1067 ret = get_data_packet(session->ctx, pos, viewer_stream,
1068 be64toh(cur_index->offset),
1069 cur_index->packet_size / CHAR_BIT);
1070 if (ret == -2) {
1071 goto retry;
1072 } else if (ret < 0) {
1073 pos->offset = EOF;
1074 fprintf(stderr, "[error] get_data_packet failed\n");
1075 return;
1076 }
1077
1078 printf_verbose("Index received : packet_size : %" PRIu64
1079 ", offset %" PRIu64 ", content_size %" PRIu64
1080 ", timestamp_end : %" PRIu64 "\n",
1081 cur_index->packet_size, cur_index->offset,
1082 cur_index->content_size,
1083 cur_index->ts_cycles.timestamp_end);
1084
1085 /* update trace_packet_header and stream_packet_context */
1086 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1087 /* Read packet header */
1088 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1089 if (ret) {
1090 pos->offset = EOF;
1091 fprintf(stderr, "[error] trace packet header read failed\n");
1092 goto end;
1093 }
1094 }
1095 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1096 /* Read packet context */
1097 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1098 if (ret) {
1099 pos->offset = EOF;
1100 fprintf(stderr, "[error] stream packet context read failed\n");
1101 goto end;
1102 }
1103 }
1104 pos->data_offset = pos->offset;
1105
1106 end:
1107 return;
1108 }
1109
1110 int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1111 {
1112 struct lttng_viewer_cmd cmd;
1113 struct lttng_viewer_create_session_response resp;
1114 int ret;
1115 ssize_t ret_len;
1116
1117 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1118 cmd.data_size = 0;
1119 cmd.cmd_version = 0;
1120
1121 do {
1122 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1123 } while (ret_len < 0 && errno == EINTR);
1124 if (ret_len < 0) {
1125 perror("[error] Error sending cmd");
1126 ret = ret_len;
1127 goto error;
1128 }
1129 assert(ret_len == sizeof(cmd));
1130
1131 do {
1132 ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
1133 } while (ret_len < 0 && errno == EINTR);
1134 if (ret_len == 0) {
1135 fprintf(stderr, "[error] Remote side has closed connection\n");
1136 ret = -1;
1137 goto error;
1138 }
1139 if (ret_len < 0) {
1140 perror("[error] Error receiving create session reply");
1141 ret = ret_len;
1142 goto error;
1143 }
1144 assert(ret_len == sizeof(resp));
1145
1146 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1147 fprintf(stderr, "[error] Error creating viewer session\n");
1148 ret = -1;
1149 goto error;
1150 }
1151 ret = 0;
1152
1153 error:
1154 return ret;
1155 }
1156
1157 static
1158 int del_traces(gpointer key, gpointer value, gpointer user_data)
1159 {
1160 struct bt_context *bt_ctx = user_data;
1161 struct lttng_live_ctf_trace *trace = value;
1162 int ret;
1163
1164 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1165 if (ret < 0)
1166 fprintf(stderr, "[error] removing trace from context\n");
1167
1168 /* remove the key/value pair from the HT. */
1169 return 1;
1170 }
1171
1172 static
1173 void add_traces(gpointer key, gpointer value, gpointer user_data)
1174 {
1175 int i, ret;
1176 struct bt_context *bt_ctx = user_data;
1177 struct lttng_live_ctf_trace *trace = value;
1178 struct lttng_live_viewer_stream *stream;
1179 struct bt_mmap_stream *new_mmap_stream;
1180 struct bt_mmap_stream_list mmap_list;
1181 struct lttng_live_ctx *ctx = NULL;
1182 struct bt_trace_descriptor *td;
1183 struct bt_trace_handle *handle;
1184
1185 /*
1186 * We don't know how many streams we will receive for a trace, so
1187 * once we are done receiving the traces, we add all the traces
1188 * received to the bt_context.
1189 * We can receive streams during the attach command or the
1190 * get_new_streams, so we have to make sure not to add multiple
1191 * times the same traces.
1192 * If a trace is already in the context, we just skip this function.
1193 */
1194 if (trace->in_use)
1195 return;
1196
1197 BT_INIT_LIST_HEAD(&mmap_list.head);
1198
1199 for (i = 0; i < trace->streams->len; i++) {
1200 stream = g_ptr_array_index(trace->streams, i);
1201 ctx = stream->session->ctx;
1202
1203 if (!stream->metadata_flag) {
1204 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1205 new_mmap_stream->priv = (void *) stream;
1206 new_mmap_stream->fd = -1;
1207 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1208 } else {
1209 /* Get all possible metadata before starting */
1210 ret = get_new_metadata(ctx, stream);
1211 if (ret)
1212 goto end_free;
1213 trace->metadata_fp = fopen(stream->path, "r");
1214 }
1215 }
1216
1217 if (!trace->metadata_fp) {
1218 fprintf(stderr, "[error] No metadata stream opened\n");
1219 goto end_free;
1220 }
1221
1222 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1223 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1224 if (ret < 0) {
1225 fprintf(stderr, "[error] Error adding trace\n");
1226 goto end_free;
1227 }
1228 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1229 bt_ctx->trace_handles,
1230 (gpointer) (unsigned long) ret);
1231 td = handle->td;
1232 trace->handle = handle;
1233 if (bt_ctx->current_iterator) {
1234 bt_iter_add_trace(bt_ctx->current_iterator, td);
1235 }
1236
1237 trace->trace_id = ret;
1238 trace->in_use = 1;
1239
1240 goto end;
1241
1242 end_free:
1243 bt_context_put(bt_ctx);
1244 end:
1245 return;
1246 }
1247
1248 int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
1249 {
1250 struct lttng_viewer_cmd cmd;
1251 struct lttng_viewer_new_streams_request rq;
1252 struct lttng_viewer_new_streams_response rp;
1253 struct lttng_viewer_stream stream;
1254 int ret, i;
1255 ssize_t ret_len;
1256 uint32_t stream_count;
1257
1258 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1259 cmd.data_size = sizeof(rq);
1260 cmd.cmd_version = 0;
1261
1262 memset(&rq, 0, sizeof(rq));
1263 rq.session_id = htobe64(id);
1264
1265 do {
1266 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1267 } while (ret_len < 0 && errno == EINTR);
1268 if (ret_len < 0) {
1269 perror("[error] Error sending cmd");
1270 ret = ret_len;
1271 goto error;
1272 }
1273 assert(ret_len == sizeof(cmd));
1274
1275 do {
1276 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
1277 } while (ret_len < 0 && errno == EINTR);
1278 if (ret_len < 0) {
1279 perror("[error] Error sending get_new_streams request");
1280 ret = ret_len;
1281 goto error;
1282 }
1283 assert(ret_len == sizeof(rq));
1284
1285 do {
1286 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
1287 } while (ret_len < 0 && errno == EINTR);
1288 if (ret_len == 0) {
1289 fprintf(stderr, "[error] Remote side has closed connection\n");
1290 ret = -1;
1291 goto error;
1292 }
1293 if (ret_len < 0) {
1294 perror("[error] Error receiving get_new_streams response");
1295 ret = ret_len;
1296 goto error;
1297 }
1298 assert(ret_len == sizeof(rp));
1299
1300 switch(be32toh(rp.status)) {
1301 case LTTNG_VIEWER_NEW_STREAMS_OK:
1302 break;
1303 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1304 ret = 0;
1305 goto end;
1306 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1307 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1308 goto end;
1309 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1310 fprintf(stderr, "[error] get_new_streams error\n");
1311 ret = -1;
1312 goto end;
1313 default:
1314 fprintf(stderr, "[error] Unknown return code %u\n",
1315 be32toh(rp.status));
1316 ret = -1;
1317 goto end;
1318 }
1319
1320 stream_count = be32toh(rp.streams_count);
1321 ctx->session->stream_count += stream_count;
1322 /*
1323 * When the session is created but not started, we do an active wait
1324 * until it starts. It allows the viewer to start processing the trace
1325 * as soon as the session starts.
1326 */
1327 if (ctx->session->stream_count == 0) {
1328 ret = 0;
1329 goto end;
1330 }
1331 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1332 ctx->session->stream_count);
1333 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1334 ctx->session->stream_count);
1335 for (i = 0; i < stream_count; i++) {
1336 do {
1337 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
1338 } while (ret_len < 0 && errno == EINTR);
1339 if (ret_len == 0) {
1340 fprintf(stderr, "[error] Remote side has closed connection\n");
1341 ret = -1;
1342 goto error;
1343 }
1344 if (ret_len < 0) {
1345 perror("[error] Error receiving stream");
1346 ret = ret_len;
1347 goto error;
1348 }
1349 assert(ret_len == sizeof(stream));
1350 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1351 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1352
1353 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1354 be64toh(stream.id), stream.path_name,
1355 stream.channel_name);
1356 ctx->session->streams[i].id = be64toh(stream.id);
1357 ctx->session->streams[i].session = ctx->session;
1358
1359 ctx->session->streams[i].first_read = 1;
1360 ctx->session->streams[i].mmap_size = 0;
1361
1362 if (be32toh(stream.metadata_flag)) {
1363 char *path;
1364
1365 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
1366 if (!path) {
1367 perror("strdup");
1368 ret = -1;
1369 goto error;
1370 }
1371 if (!mkdtemp(path)) {
1372 perror("mkdtemp");
1373 free(path);
1374 ret = -1;
1375 goto error;
1376 }
1377 ctx->session->streams[i].metadata_flag = 1;
1378 snprintf(ctx->session->streams[i].path,
1379 sizeof(ctx->session->streams[i].path),
1380 "%s/%s", path,
1381 stream.channel_name);
1382 ret = open(ctx->session->streams[i].path,
1383 O_WRONLY | O_CREAT | O_TRUNC,
1384 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1385 if (ret < 0) {
1386 perror("open");
1387 free(path);
1388 goto error;
1389 }
1390 ctx->session->streams[i].fd = ret;
1391 free(path);
1392 }
1393 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1394 be64toh(stream.ctf_trace_id));
1395 if (ret < 0) {
1396 goto error;
1397 }
1398
1399 }
1400 ret = 0;
1401
1402 end:
1403 error:
1404 return ret;
1405 }
1406
1407 void lttng_live_read(struct lttng_live_ctx *ctx)
1408 {
1409 int ret, i;
1410 struct bt_ctf_iter *iter;
1411 const struct bt_ctf_event *event;
1412 struct bt_iter_pos begin_pos;
1413 struct bt_trace_descriptor *td_write;
1414 struct bt_format *fmt_write;
1415 struct ctf_text_stream_pos *sout;
1416 uint64_t id;
1417
1418 ctx->bt_ctx = bt_context_create();
1419 if (!ctx->bt_ctx) {
1420 fprintf(stderr, "[error] bt_context_create allocation\n");
1421 goto end;
1422 }
1423
1424 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1425 if (!fmt_write) {
1426 fprintf(stderr, "[error] ctf-text error\n");
1427 goto end;
1428 }
1429
1430 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1431 if (!td_write) {
1432 fprintf(stderr, "[error] Error opening output trace\n");
1433 goto end_free;
1434 }
1435
1436 sout = container_of(td_write, struct ctf_text_stream_pos,
1437 trace_descriptor);
1438 if (!sout->parent.event_cb)
1439 goto end_free;
1440
1441 ret = lttng_live_create_viewer_session(ctx);
1442 if (ret < 0) {
1443 goto end_free;
1444 }
1445
1446 for (i = 0; i < ctx->session_ids->len; i++) {
1447 id = g_array_index(ctx->session_ids, uint64_t, i);
1448 printf_verbose("Attaching to session %lu\n", id);
1449 ret = lttng_live_attach_session(ctx, id);
1450 printf_verbose("Attaching session returns %d\n", ret);
1451 if (ret < 0) {
1452 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1453 fprintf(stderr, "[error] Unknown session ID\n");
1454 }
1455 goto end_free;
1456 }
1457 }
1458
1459 /*
1460 * As long as the session is active, we try to get new streams.
1461 */
1462 for (;;) {
1463 int flags;
1464
1465 while (!ctx->session->stream_count) {
1466 if (ctx->session_ids->len == 0)
1467 goto end_free;
1468 ret = ask_new_streams(ctx);
1469 if (ret < 0)
1470 goto end_free;
1471 }
1472
1473 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1474 ctx->bt_ctx);
1475
1476 begin_pos.type = BT_SEEK_BEGIN;
1477 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
1478 if (!iter) {
1479 fprintf(stderr, "[error] Iterator creation error\n");
1480 goto end;
1481 }
1482 for (;;) {
1483 event = bt_ctf_iter_read_event_flags(iter, &flags);
1484 if (!(flags & BT_ITER_FLAG_RETRY)) {
1485 if (!event) {
1486 /* End of trace */
1487 break;
1488 }
1489 ret = sout->parent.event_cb(&sout->parent,
1490 event->parent->stream);
1491 if (ret) {
1492 fprintf(stderr, "[error] Writing "
1493 "event failed.\n");
1494 goto end_free;
1495 }
1496 }
1497 ret = bt_iter_next(bt_ctf_get_iter(iter));
1498 if (ret < 0) {
1499 goto end_free;
1500 }
1501 }
1502 bt_ctf_iter_destroy(iter);
1503 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1504 del_traces, ctx->bt_ctx);
1505 ctx->session->stream_count = 0;
1506 }
1507
1508 end_free:
1509 bt_context_put(ctx->bt_ctx);
1510 end:
1511 return;
1512 }
This page took 0.05905 seconds and 5 git commands to generate.