Fix: lttng-live non NULL-terminated strings
[babeltrace.git] / formats / lttng-live / lttng-live-functions.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include <babeltrace/ctf/ctf-index.h>
38
39 #include <babeltrace/babeltrace.h>
40 #include <babeltrace/ctf/events.h>
41 #include <babeltrace/ctf/callbacks.h>
42 #include <babeltrace/ctf/iterator.h>
43
44 /* for packet_index */
45 #include <babeltrace/ctf/types.h>
46
47 #include <babeltrace/ctf/metadata.h>
48 #include <babeltrace/ctf-text/types.h>
49 #include <babeltrace/ctf/events-internal.h>
50 #include <formats/ctf/events-private.h>
51
52 #include "lttng-live-functions.h"
53 #include "lttng-viewer.h"
54
55 /*
56 * Memory allocation zeroed
57 */
58 #define zmalloc(x) calloc(1, x)
59
60 #ifndef max_t
61 #define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63 #endif
64
65 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
66 int port)
67 {
68 struct hostent *host;
69 struct sockaddr_in server_addr;
70 int ret;
71
72 host = gethostbyname(hostname);
73 if (!host) {
74 ret = -1;
75 goto end;
76 }
77
78 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
79 perror("Socket");
80 ret = -1;
81 goto end;
82 }
83
84 server_addr.sin_family = AF_INET;
85 server_addr.sin_port = htons(port);
86 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
87 bzero(&(server_addr.sin_zero), 8);
88
89 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
90 sizeof(struct sockaddr)) == -1) {
91 perror("Connect");
92 ret = -1;
93 goto end;
94 }
95
96 ret = 0;
97
98 end:
99 return ret;
100 }
101
102 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
103 {
104 struct lttng_viewer_cmd cmd;
105 struct lttng_viewer_connect connect;
106 int ret;
107 ssize_t ret_len;
108
109 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
110 cmd.data_size = sizeof(connect);
111 cmd.cmd_version = 0;
112
113 connect.major = htobe32(LTTNG_LIVE_MAJOR);
114 connect.minor = htobe32(LTTNG_LIVE_MINOR);
115 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
116
117 do {
118 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
119 } while (ret_len < 0 && errno == EINTR);
120 if (ret_len < 0) {
121 fprintf(stderr, "[error] Error sending cmd\n");
122 ret = ret_len;
123 goto error;
124 }
125 assert(ret_len == sizeof(cmd));
126
127 do {
128 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
129 } while (ret_len < 0 && errno == EINTR);
130 if (ret_len < 0) {
131 fprintf(stderr, "[error] Error sending version\n");
132 ret = ret_len;
133 goto error;
134 }
135 assert(ret_len == sizeof(connect));
136
137 do {
138 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
139 } while (ret_len < 0 && errno == EINTR);
140 if (ret_len < 0) {
141 fprintf(stderr, "[error] Error receiving version\n");
142 ret = ret_len;
143 goto error;
144 }
145 assert(ret_len == sizeof(connect));
146
147 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
148 be64toh(connect.viewer_session_id));
149 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
150 be32toh(connect.minor));
151
152 ret = 0;
153
154 error:
155 return ret;
156 }
157
158 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
159 {
160 struct lttng_viewer_cmd cmd;
161 struct lttng_viewer_list_sessions list;
162 struct lttng_viewer_session lsession;
163 int i, ret;
164 ssize_t ret_len;
165 int sessions_count;
166
167 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
168 cmd.data_size = 0;
169 cmd.cmd_version = 0;
170
171 do {
172 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
173 } while (ret_len < 0 && errno == EINTR);
174 if (ret_len < 0) {
175 fprintf(stderr, "[error] Error sending cmd\n");
176 ret = ret_len;
177 goto error;
178 }
179 assert(ret_len == sizeof(cmd));
180
181 do {
182 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
183 } while (ret_len < 0 && errno == EINTR);
184 if (ret_len < 0) {
185 fprintf(stderr, "[error] Error receiving session list\n");
186 ret = ret_len;
187 goto error;
188 }
189 assert(ret_len == sizeof(list));
190
191 sessions_count = be32toh(list.sessions_count);
192 fprintf(stdout, "%u active session(s)%c\n", sessions_count,
193 sessions_count > 0 ? ':' : ' ');
194 for (i = 0; i < sessions_count; i++) {
195 do {
196 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
197 } while (ret_len < 0 && errno == EINTR);
198 if (ret_len < 0) {
199 fprintf(stderr, "[error] Error receiving session\n");
200 ret = ret_len;
201 goto error;
202 }
203 assert(ret_len == sizeof(lsession));
204 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
205 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
206
207 fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
208 "%u stream(s), %u client(s) connected)\n",
209 path, be64toh(lsession.id),
210 lsession.session_name, lsession.hostname,
211 be32toh(lsession.live_timer),
212 be32toh(lsession.streams),
213 be32toh(lsession.clients));
214 }
215
216 ret = 0;
217
218 error:
219 return ret;
220 }
221
222 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
223 uint64_t ctf_trace_id)
224 {
225 struct lttng_live_ctf_trace *trace;
226 int ret = 0;
227
228 trace = g_hash_table_lookup(stream->session->ctf_traces,
229 (gpointer) ctf_trace_id);
230 if (!trace) {
231 trace = g_new0(struct lttng_live_ctf_trace, 1);
232 if (!trace) {
233 ret = -1;
234 fprintf(stderr, "[error] ctf_trace allocation\n");
235 goto error;
236 }
237 trace->ctf_trace_id = ctf_trace_id;
238 trace->streams = g_ptr_array_new();
239 g_hash_table_insert(stream->session->ctf_traces,
240 (gpointer) ctf_trace_id,
241 trace);
242 }
243 if (stream->metadata_flag)
244 trace->metadata_stream = stream;
245
246 stream->ctf_trace = trace;
247 g_ptr_array_add(trace->streams, stream);
248
249 error:
250 return ret;
251 }
252
253 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
254 {
255 struct lttng_viewer_cmd cmd;
256 struct lttng_viewer_attach_session_request rq;
257 struct lttng_viewer_attach_session_response rp;
258 struct lttng_viewer_stream stream;
259 int ret, i;
260 ssize_t ret_len;
261
262 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
263 cmd.data_size = sizeof(rq);
264 cmd.cmd_version = 0;
265
266 memset(&rq, 0, sizeof(rq));
267 rq.session_id = htobe64(id);
268 // TODO: add cmd line parameter to select seek beginning
269 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
270 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
271
272 do {
273 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
274 } while (ret_len < 0 && errno == EINTR);
275 if (ret_len < 0) {
276 fprintf(stderr, "[error] Error sending cmd\n");
277 ret = ret_len;
278 goto error;
279 }
280 assert(ret_len == sizeof(cmd));
281
282 do {
283 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
284 } while (ret_len < 0 && errno == EINTR);
285 if (ret_len < 0) {
286 fprintf(stderr, "[error] Error sending attach request\n");
287 ret = ret_len;
288 goto error;
289 }
290 assert(ret_len == sizeof(rq));
291
292 do {
293 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
294 } while (ret_len < 0 && errno == EINTR);
295 if (ret_len < 0) {
296 fprintf(stderr, "[error] Error receiving attach response\n");
297 ret = ret_len;
298 goto error;
299 }
300 assert(ret_len == sizeof(rp));
301
302 switch(be32toh(rp.status)) {
303 case LTTNG_VIEWER_ATTACH_OK:
304 break;
305 case LTTNG_VIEWER_ATTACH_UNK:
306 ret = -LTTNG_VIEWER_ATTACH_UNK;
307 goto end;
308 case LTTNG_VIEWER_ATTACH_ALREADY:
309 fprintf(stderr, "[error] Already a viewer attached\n");
310 ret = -1;
311 goto end;
312 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
313 fprintf(stderr, "[error] Not a live session\n");
314 ret = -1;
315 goto end;
316 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
317 fprintf(stderr, "[error] Wrong seek parameter\n");
318 ret = -1;
319 goto end;
320 default:
321 fprintf(stderr, "[error] Unknown attach return code %u\n",
322 be32toh(rp.status));
323 ret = -1;
324 goto end;
325 }
326 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
327 ret = -1;
328 goto end;
329 }
330
331 ctx->session->stream_count = be32toh(rp.streams_count);
332 /*
333 * When the session is created but not started, we do an active wait
334 * until it starts. It allows the viewer to start processing the trace
335 * as soon as the session starts.
336 */
337 if (ctx->session->stream_count == 0) {
338 ret = 0;
339 goto end;
340 }
341 printf_verbose("Waiting for %" PRIu64 " streams:\n",
342 ctx->session->stream_count);
343 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
344 ctx->session->stream_count);
345 if (!ctx->session->streams) {
346 ret = -1;
347 goto error;
348 }
349
350 for (i = 0; i < be32toh(rp.streams_count); i++) {
351 do {
352 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
353 } while (ret_len < 0 && errno == EINTR);
354 if (ret_len < 0) {
355 fprintf(stderr, "[error] Error receiving stream\n");
356 ret = ret_len;
357 goto error;
358 }
359 assert(ret_len == sizeof(stream));
360 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
361 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
362
363 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
364 be64toh(stream.id), stream.path_name,
365 stream.channel_name);
366 ctx->session->streams[i].id = be64toh(stream.id);
367 ctx->session->streams[i].session = ctx->session;
368
369 ctx->session->streams[i].first_read = 1;
370 ctx->session->streams[i].mmap_size = 0;
371
372 if (be32toh(stream.metadata_flag)) {
373 char *path;
374
375 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
376 path = mkdtemp(path);
377 ctx->session->streams[i].metadata_flag = 1;
378 snprintf(ctx->session->streams[i].path,
379 sizeof(ctx->session->streams[i].path),
380 "%s/%s", path,
381 stream.channel_name);
382 ret = open(ctx->session->streams[i].path,
383 O_WRONLY | O_CREAT | O_TRUNC,
384 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
385 if (ret < 0) {
386 perror("open");
387 goto error;
388 }
389 ctx->session->streams[i].fd = ret;
390 }
391 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
392 be64toh(stream.ctf_trace_id));
393 if (ret < 0) {
394 goto error;
395 }
396
397 }
398 ret = 0;
399
400 end:
401 error:
402 return ret;
403 }
404
405 static
406 int get_data_packet(struct lttng_live_ctx *ctx,
407 struct ctf_stream_pos *pos,
408 struct lttng_live_viewer_stream *stream, uint64_t offset,
409 uint64_t len)
410 {
411 struct lttng_viewer_cmd cmd;
412 struct lttng_viewer_get_packet rq;
413 struct lttng_viewer_trace_packet rp;
414 ssize_t ret_len;
415 int ret;
416
417 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
418 cmd.data_size = sizeof(rq);
419 cmd.cmd_version = 0;
420
421 memset(&rq, 0, sizeof(rq));
422 rq.stream_id = htobe64(stream->id);
423 /* Already in big endian. */
424 rq.offset = offset;
425 rq.len = htobe32(len);
426
427 do {
428 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
429 } while (ret_len < 0 && errno == EINTR);
430 if (ret_len < 0) {
431 fprintf(stderr, "[error] Error sending cmd\n");
432 ret = ret_len;
433 goto error;
434 }
435 assert(ret_len == sizeof(cmd));
436
437 do {
438 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
439 } while (ret_len < 0 && errno == EINTR);
440 if (ret_len < 0) {
441 fprintf(stderr, "[error] Error sending get_data_packet request\n");
442 ret = ret_len;
443 goto error;
444 }
445 assert(ret_len == sizeof(rq));
446
447 do {
448 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
449 } while (ret_len < 0 && errno == EINTR);
450 if (ret_len < 0) {
451 fprintf(stderr, "[error] Error receiving data response\n");
452 ret = ret_len;
453 goto error;
454 }
455 assert(ret_len == sizeof(rp));
456
457 rp.flags = be32toh(rp.flags);
458
459 switch (be32toh(rp.status)) {
460 case LTTNG_VIEWER_GET_PACKET_OK:
461 len = be32toh(rp.len);
462 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
463 "\n", len);
464 break;
465 case LTTNG_VIEWER_GET_PACKET_RETRY:
466 printf_verbose("get_data_packet: retry\n");
467 ret = -1;
468 goto end;
469 case LTTNG_VIEWER_GET_PACKET_ERR:
470 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
471 printf_verbose("get_data_packet: new metadata needed\n");
472 ret = 0;
473 goto end;
474 }
475 fprintf(stderr, "[error] get_data_packet: error\n");
476 ret = -1;
477 goto end;
478 case LTTNG_VIEWER_GET_PACKET_EOF:
479 ret = -2;
480 goto error;
481 default:
482 printf_verbose("get_data_packet: unknown\n");
483 assert(0);
484 ret = -1;
485 goto end;
486 }
487
488 if (len <= 0) {
489 ret = -1;
490 goto end;
491 }
492
493 if (len > stream->mmap_size) {
494 uint64_t new_size;
495
496 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
497 if (pos->base_mma) {
498 /* unmap old base */
499 ret = munmap_align(pos->base_mma);
500 if (ret) {
501 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
502 strerror(errno));
503 ret = -1;
504 goto error;
505 }
506 pos->base_mma = NULL;
507 }
508 pos->base_mma = mmap_align(new_size,
509 PROT_READ | PROT_WRITE,
510 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
511 if (pos->base_mma == MAP_FAILED) {
512 fprintf(stderr, "[error] mmap error %s.\n",
513 strerror(errno));
514 pos->base_mma = NULL;
515 ret = -1;
516 goto error;
517 }
518
519 stream->mmap_size = new_size;
520 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
521 stream->mmap_size);
522 }
523
524 do {
525 ret_len = recv(ctx->control_sock,
526 mmap_align_addr(pos->base_mma), len,
527 MSG_WAITALL);
528 } while (ret_len < 0 && errno == EINTR);
529 if (ret_len < 0) {
530 fprintf(stderr, "[error] Error receiving trace packet\n");
531 ret = ret_len;
532 goto error;
533 }
534 assert(ret_len == len);
535
536 end:
537 error:
538 return ret;
539 }
540
541 /*
542 * Return number of metadata bytes written or a negative value on error.
543 */
544 static
545 int get_new_metadata(struct lttng_live_ctx *ctx,
546 struct lttng_live_viewer_stream *viewer_stream,
547 uint64_t *metadata_len)
548 {
549 uint64_t len = 0;
550 int ret;
551 struct lttng_viewer_cmd cmd;
552 struct lttng_viewer_get_metadata rq;
553 struct lttng_viewer_metadata_packet rp;
554 struct lttng_live_viewer_stream *metadata_stream;
555 char *data = NULL;
556 ssize_t ret_len;
557
558 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
559 cmd.data_size = sizeof(rq);
560 cmd.cmd_version = 0;
561
562 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
563 rq.stream_id = htobe64(metadata_stream->id);
564
565 do {
566 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
567 } while (ret_len < 0 && errno == EINTR);
568 if (ret_len < 0) {
569 fprintf(stderr, "[error] Error sending cmd\n");
570 ret = ret_len;
571 goto error;
572 }
573 assert(ret_len == sizeof(cmd));
574
575 do {
576 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
577 } while (ret_len < 0 && errno == EINTR);
578 if (ret_len < 0) {
579 fprintf(stderr, "[error] Error sending get_metadata request\n");
580 ret = ret_len;
581 goto error;
582 }
583 assert(ret_len == sizeof(rq));
584
585 do {
586 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
587 } while (ret_len < 0 && errno == EINTR);
588 if (ret_len < 0) {
589 fprintf(stderr, "[error] Error receiving metadata response\n");
590 ret = ret_len;
591 goto error;
592 }
593 assert(ret_len == sizeof(rp));
594
595 switch (be32toh(rp.status)) {
596 case LTTNG_VIEWER_METADATA_OK:
597 printf_verbose("get_metadata : OK\n");
598 break;
599 case LTTNG_VIEWER_NO_NEW_METADATA:
600 printf_verbose("get_metadata : NO NEW\n");
601 ret = -1;
602 goto end;
603 case LTTNG_VIEWER_METADATA_ERR:
604 printf_verbose("get_metadata : ERR\n");
605 ret = -1;
606 goto end;
607 default:
608 printf_verbose("get_metadata : UNKNOWN\n");
609 ret = -1;
610 goto end;
611 }
612
613 len = be64toh(rp.len);
614 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
615 if (len <= 0) {
616 ret = -1;
617 goto end;
618 }
619
620 data = zmalloc(len);
621 if (!data) {
622 perror("relay data zmalloc");
623 ret = -1;
624 goto error;
625 }
626 do {
627 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
628 } while (ret_len < 0 && errno == EINTR);
629 if (ret_len < 0) {
630 fprintf(stderr, "[error] Error receiving trace packet\n");
631 ret = ret_len;
632 free(data);
633 goto error;
634 }
635 assert(ret_len == len);
636
637 do {
638 ret_len = write(metadata_stream->fd, data, len);
639 } while (ret_len < 0 && errno == EINTR);
640 if (ret_len < 0) {
641 free(data);
642 ret = ret_len;
643 goto error;
644 }
645 assert(ret_len == len);
646
647 free(data);
648
649 *metadata_len = len;
650 ret = 0;
651 end:
652 error:
653 return ret;
654 }
655
656 /*
657 * Get one index for a stream.
658 *
659 * Returns 0 on success or a negative value on error.
660 */
661 static
662 int get_next_index(struct lttng_live_ctx *ctx,
663 struct lttng_live_viewer_stream *viewer_stream,
664 struct packet_index *index)
665 {
666 struct lttng_viewer_cmd cmd;
667 struct lttng_viewer_get_next_index rq;
668 struct lttng_viewer_index rp;
669 int ret;
670 uint64_t metadata_len;
671 ssize_t ret_len;
672
673 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
674 cmd.data_size = sizeof(rq);
675 cmd.cmd_version = 0;
676
677 memset(&rq, 0, sizeof(rq));
678 rq.stream_id = htobe64(viewer_stream->id);
679
680 retry:
681 do {
682 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
683 } while (ret_len < 0 && errno == EINTR);
684 if (ret_len < 0) {
685 fprintf(stderr, "[error] Error sending cmd\n");
686 ret = ret_len;
687 goto error;
688 }
689 assert(ret_len == sizeof(cmd));
690
691 do {
692 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
693 } while (ret_len < 0 && errno == EINTR);
694 if (ret_len < 0) {
695 fprintf(stderr, "[error] Error sending get_next_index request\n");
696 ret = ret_len;
697 goto error;
698 }
699 assert(ret_len == sizeof(rq));
700
701 do {
702 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
703 } while (ret_len < 0 && errno == EINTR);
704 if (ret_len < 0) {
705 fprintf(stderr, "[error] Error receiving index response\n");
706 ret = ret_len;
707 goto error;
708 }
709 assert(ret_len == sizeof(rp));
710
711 rp.flags = be32toh(rp.flags);
712
713 switch (be32toh(rp.status)) {
714 case LTTNG_VIEWER_INDEX_INACTIVE:
715 printf_verbose("get_next_index: inactive\n");
716 memset(index, 0, sizeof(struct packet_index));
717 index->timestamp_end = be64toh(rp.timestamp_end);
718 break;
719 case LTTNG_VIEWER_INDEX_OK:
720 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
721 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
722 index->offset = be64toh(rp.offset);
723 index->packet_size = be64toh(rp.packet_size);
724 index->content_size = be64toh(rp.content_size);
725 index->timestamp_begin = be64toh(rp.timestamp_begin);
726 index->timestamp_end = be64toh(rp.timestamp_end);
727 index->events_discarded = be64toh(rp.events_discarded);
728
729 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
730 printf_verbose("get_next_index: new metadata needed\n");
731 ret = get_new_metadata(ctx, viewer_stream,
732 &metadata_len);
733 if (ret < 0) {
734 goto error;
735 }
736 }
737 break;
738 case LTTNG_VIEWER_INDEX_RETRY:
739 printf_verbose("get_next_index: retry\n");
740 sleep(1);
741 goto retry;
742 case LTTNG_VIEWER_INDEX_HUP:
743 printf_verbose("get_next_index: stream hung up\n");
744 viewer_stream->id = -1ULL;
745 viewer_stream->fd = -1;
746 index->offset = EOF;
747 break;
748 case LTTNG_VIEWER_INDEX_ERR:
749 fprintf(stderr, "[error] get_next_index: error\n");
750 ret = -1;
751 goto error;
752 default:
753 fprintf(stderr, "[error] get_next_index: unkwown value\n");
754 ret = -1;
755 goto error;
756 }
757
758 ret = 0;
759
760 error:
761 return ret;
762 }
763
764 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
765 int whence)
766 {
767 struct ctf_stream_pos *pos;
768 struct ctf_file_stream *file_stream;
769 struct packet_index packet_index;
770 struct lttng_live_viewer_stream *viewer_stream;
771 struct lttng_live_session *session;
772 int ret;
773
774 retry:
775 pos = ctf_pos(stream_pos);
776 file_stream = container_of(pos, struct ctf_file_stream, pos);
777 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
778 session = viewer_stream->session;
779
780 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
781 ret = get_next_index(session->ctx, viewer_stream, &packet_index);
782 if (ret < 0) {
783 pos->offset = EOF;
784 fprintf(stderr, "[error] get_next_index failed\n");
785 return;
786 }
787
788 pos->packet_size = packet_index.packet_size;
789 pos->content_size = packet_index.content_size;
790 pos->mmap_base_offset = 0;
791 if (packet_index.offset == EOF) {
792 pos->offset = EOF;
793 } else {
794 pos->offset = 0;
795 }
796
797 if (packet_index.content_size == 0) {
798 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
799 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
800 &file_stream->parent, packet_index.timestamp_end);
801 } else {
802 file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
803 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
804 &file_stream->parent, packet_index.timestamp_begin);
805 }
806
807 if (pos->packet_size == 0 || pos->offset == EOF) {
808 goto end;
809 }
810
811 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
812 viewer_stream->id);
813 ret = get_data_packet(session->ctx, pos, viewer_stream,
814 be64toh(packet_index.offset),
815 packet_index.packet_size / CHAR_BIT);
816 if (ret == -2) {
817 goto retry;
818 } else if (ret < 0) {
819 pos->offset = EOF;
820 fprintf(stderr, "[error] get_data_packet failed\n");
821 return;
822 }
823
824 printf_verbose("Index received : packet_size : %" PRIu64
825 ", offset %" PRIu64 ", content_size %" PRIu64
826 ", timestamp_end : %" PRIu64 "\n",
827 packet_index.packet_size, packet_index.offset,
828 packet_index.content_size, packet_index.timestamp_end);
829
830 /* update trace_packet_header and stream_packet_context */
831 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
832 /* Read packet header */
833 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
834 if (ret) {
835 pos->offset = EOF;
836 fprintf(stderr, "[error] trace packet header read failed\n");
837 goto end;
838 }
839 }
840 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
841 /* Read packet context */
842 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
843 if (ret) {
844 pos->offset = EOF;
845 fprintf(stderr, "[error] stream packet context read failed\n");
846 goto end;
847 }
848 }
849 pos->data_offset = pos->offset;
850
851 end:
852 return;
853 }
854
855 static int del_traces(gpointer key, gpointer value, gpointer user_data)
856 {
857 struct bt_context *bt_ctx = user_data;
858 struct lttng_live_ctf_trace *trace = value;
859 int ret;
860
861 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
862 if (ret < 0)
863 fprintf(stderr, "[error] removing trace from context\n");
864
865 /* remove the key/value pair from the HT. */
866 return 1;
867 }
868
869 static void add_traces(gpointer key, gpointer value, gpointer user_data)
870 {
871 int i, ret, total_metadata = 0;
872 uint64_t metadata_len;
873 struct bt_context *bt_ctx = user_data;
874 struct lttng_live_ctf_trace *trace = value;
875 struct lttng_live_viewer_stream *stream;
876 struct bt_mmap_stream *new_mmap_stream;
877 struct bt_mmap_stream_list mmap_list;
878 struct lttng_live_ctx *ctx = NULL;
879
880 BT_INIT_LIST_HEAD(&mmap_list.head);
881
882 for (i = 0; i < trace->streams->len; i++) {
883 stream = g_ptr_array_index(trace->streams, i);
884 ctx = stream->session->ctx;
885
886 if (!stream->metadata_flag) {
887 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
888 new_mmap_stream->priv = (void *) stream;
889 new_mmap_stream->fd = -1;
890 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
891 } else {
892 /* Get all possible metadata before starting */
893 do {
894 ret = get_new_metadata(ctx, stream,
895 &metadata_len);
896 if (ret == 0) {
897 total_metadata += metadata_len;
898 }
899 } while (ret == 0 || total_metadata == 0);
900 trace->metadata_fp = fopen(stream->path, "r");
901 }
902 }
903
904 if (!trace->metadata_fp) {
905 fprintf(stderr, "[error] No metadata stream opened\n");
906 goto end_free;
907 }
908
909 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
910 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
911 if (ret < 0) {
912 fprintf(stderr, "[error] Error adding trace\n");
913 goto end_free;
914 }
915 trace->trace_id = ret;
916
917 goto end;
918
919 end_free:
920 bt_context_put(bt_ctx);
921 end:
922 return;
923 }
924
925 void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
926 {
927 int ret, active_session = 0;
928 struct bt_context *bt_ctx;
929 struct bt_ctf_iter *iter;
930 const struct bt_ctf_event *event;
931 struct bt_iter_pos begin_pos;
932 struct bt_trace_descriptor *td_write;
933 struct bt_format *fmt_write;
934 struct ctf_text_stream_pos *sout;
935
936 bt_ctx = bt_context_create();
937 if (!bt_ctx) {
938 fprintf(stderr, "[error] bt_context_create allocation\n");
939 goto end;
940 }
941
942 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
943 if (!fmt_write) {
944 fprintf(stderr, "[error] ctf-text error\n");
945 goto end;
946 }
947
948 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
949 if (!td_write) {
950 fprintf(stderr, "[error] Error opening output trace\n");
951 goto end_free;
952 }
953
954 sout = container_of(td_write, struct ctf_text_stream_pos,
955 trace_descriptor);
956 if (!sout->parent.event_cb)
957 goto end_free;
958
959 /*
960 * As long as the session is active, we try to reattach to it,
961 * even if all the streams get closed.
962 */
963 do {
964 int flags;
965
966 do {
967 ret = lttng_live_attach_session(ctx, session_id);
968 printf_verbose("Attaching session returns %d\n", ret);
969 if (ret < 0) {
970 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
971 if (active_session)
972 goto end_free;
973 fprintf(stderr, "[error] Unknown "
974 "session ID\n");
975 }
976 goto end_free;
977 } else {
978 active_session = 1;
979 }
980 } while (ctx->session->stream_count == 0);
981
982 g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
983
984 begin_pos.type = BT_SEEK_BEGIN;
985 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
986 if (!iter) {
987 fprintf(stderr, "[error] Iterator creation error\n");
988 goto end;
989 }
990 for (;;) {
991 event = bt_ctf_iter_read_event_flags(iter, &flags);
992 if (!(flags & BT_ITER_FLAG_RETRY)) {
993 if (!event) {
994 /* End of trace */
995 break;
996 }
997 ret = sout->parent.event_cb(&sout->parent,
998 event->parent->stream);
999 if (ret) {
1000 fprintf(stderr, "[error] Writing "
1001 "event failed.\n");
1002 goto end_free;
1003 }
1004 }
1005 ret = bt_iter_next(bt_ctf_get_iter(iter));
1006 if (ret < 0) {
1007 goto end_free;
1008 }
1009 }
1010 bt_ctf_iter_destroy(iter);
1011 g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
1012 } while (active_session);
1013
1014 end_free:
1015 bt_context_put(bt_ctx);
1016 end:
1017 return;
1018 }
This page took 0.090144 seconds and 4 git commands to generate.