Fix: lttng-live: unbounded use of sscanf() in parse_url()
[babeltrace.git] / formats / lttng-live / lttng-live-functions.c
CommitLineData
4a744367
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <netinet/in.h>
27#include <netdb.h>
28#include <stdio.h>
29#include <string.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <errno.h>
33#include <inttypes.h>
34#include <fcntl.h>
35#include <sys/mman.h>
36
37#include <babeltrace/ctf/ctf-index.h>
38
39#include <babeltrace/babeltrace.h>
40#include <babeltrace/ctf/events.h>
41#include <babeltrace/ctf/callbacks.h>
42#include <babeltrace/ctf/iterator.h>
43
44/* for packet_index */
45#include <babeltrace/ctf/types.h>
46
47#include <babeltrace/ctf/metadata.h>
48#include <babeltrace/ctf-text/types.h>
49#include <babeltrace/ctf/events-internal.h>
50#include <formats/ctf/events-private.h>
51
52#include "lttng-live-functions.h"
53#include "lttng-viewer.h"
54
55/*
56 * Memory allocation zeroed
57 */
58#define zmalloc(x) calloc(1, x)
59
60#ifndef max_t
61#define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63#endif
64
65int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
66 int port)
67{
68 struct hostent *host;
69 struct sockaddr_in server_addr;
70 int ret;
71
72 host = gethostbyname(hostname);
73 if (!host) {
74 ret = -1;
75 goto end;
76 }
77
78 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
79 perror("Socket");
80 ret = -1;
81 goto end;
82 }
83
84 server_addr.sin_family = AF_INET;
85 server_addr.sin_port = htons(port);
86 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
87 bzero(&(server_addr.sin_zero), 8);
88
89 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
90 sizeof(struct sockaddr)) == -1) {
91 perror("Connect");
92 ret = -1;
93 goto end;
94 }
95
96 ret = 0;
97
98end:
99 return ret;
100}
101
102int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
103{
104 struct lttng_viewer_cmd cmd;
105 struct lttng_viewer_connect connect;
106 int ret;
107 ssize_t ret_len;
108
109 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
110 cmd.data_size = sizeof(connect);
111 cmd.cmd_version = 0;
112
0c3cd7e1 113 connect.viewer_session_id = -1ULL; /* will be set on recv */
4a744367
JD
114 connect.major = htobe32(LTTNG_LIVE_MAJOR);
115 connect.minor = htobe32(LTTNG_LIVE_MINOR);
116 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
117
118 do {
119 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
120 } while (ret_len < 0 && errno == EINTR);
121 if (ret_len < 0) {
122 fprintf(stderr, "[error] Error sending cmd\n");
123 ret = ret_len;
124 goto error;
125 }
126 assert(ret_len == sizeof(cmd));
127
128 do {
129 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
130 } while (ret_len < 0 && errno == EINTR);
131 if (ret_len < 0) {
132 fprintf(stderr, "[error] Error sending version\n");
133 ret = ret_len;
134 goto error;
135 }
136 assert(ret_len == sizeof(connect));
137
138 do {
139 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
140 } while (ret_len < 0 && errno == EINTR);
141 if (ret_len < 0) {
142 fprintf(stderr, "[error] Error receiving version\n");
143 ret = ret_len;
144 goto error;
145 }
146 assert(ret_len == sizeof(connect));
147
148 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
149 be64toh(connect.viewer_session_id));
150 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
151 be32toh(connect.minor));
152
153 ret = 0;
154
155error:
156 return ret;
157}
158
159int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
160{
161 struct lttng_viewer_cmd cmd;
162 struct lttng_viewer_list_sessions list;
163 struct lttng_viewer_session lsession;
164 int i, ret;
165 ssize_t ret_len;
166 int sessions_count;
167
168 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
169 cmd.data_size = 0;
170 cmd.cmd_version = 0;
171
172 do {
173 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
174 } while (ret_len < 0 && errno == EINTR);
175 if (ret_len < 0) {
176 fprintf(stderr, "[error] Error sending cmd\n");
177 ret = ret_len;
178 goto error;
179 }
180 assert(ret_len == sizeof(cmd));
181
182 do {
183 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
184 } while (ret_len < 0 && errno == EINTR);
185 if (ret_len < 0) {
186 fprintf(stderr, "[error] Error receiving session list\n");
187 ret = ret_len;
188 goto error;
189 }
190 assert(ret_len == sizeof(list));
191
192 sessions_count = be32toh(list.sessions_count);
193 fprintf(stdout, "%u active session(s)%c\n", sessions_count,
194 sessions_count > 0 ? ':' : ' ');
195 for (i = 0; i < sessions_count; i++) {
196 do {
197 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
198 } while (ret_len < 0 && errno == EINTR);
199 if (ret_len < 0) {
200 fprintf(stderr, "[error] Error receiving session\n");
201 ret = ret_len;
202 goto error;
203 }
204 assert(ret_len == sizeof(lsession));
9ed31e53
MD
205 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
206 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
4a744367
JD
207
208 fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
209 "%u stream(s), %u client(s) connected)\n",
210 path, be64toh(lsession.id),
211 lsession.session_name, lsession.hostname,
212 be32toh(lsession.live_timer),
213 be32toh(lsession.streams),
214 be32toh(lsession.clients));
215 }
216
217 ret = 0;
218
219error:
220 return ret;
221}
222
223int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
224 uint64_t ctf_trace_id)
225{
226 struct lttng_live_ctf_trace *trace;
227 int ret = 0;
228
229 trace = g_hash_table_lookup(stream->session->ctf_traces,
230 (gpointer) ctf_trace_id);
231 if (!trace) {
232 trace = g_new0(struct lttng_live_ctf_trace, 1);
4a744367
JD
233 trace->ctf_trace_id = ctf_trace_id;
234 trace->streams = g_ptr_array_new();
235 g_hash_table_insert(stream->session->ctf_traces,
236 (gpointer) ctf_trace_id,
237 trace);
238 }
239 if (stream->metadata_flag)
240 trace->metadata_stream = stream;
241
242 stream->ctf_trace = trace;
243 g_ptr_array_add(trace->streams, stream);
244
4a744367
JD
245 return ret;
246}
247
248int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
249{
250 struct lttng_viewer_cmd cmd;
251 struct lttng_viewer_attach_session_request rq;
252 struct lttng_viewer_attach_session_response rp;
253 struct lttng_viewer_stream stream;
254 int ret, i;
255 ssize_t ret_len;
256
257 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
258 cmd.data_size = sizeof(rq);
259 cmd.cmd_version = 0;
260
261 memset(&rq, 0, sizeof(rq));
262 rq.session_id = htobe64(id);
263 // TODO: add cmd line parameter to select seek beginning
264 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
265 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
266
267 do {
268 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
269 } while (ret_len < 0 && errno == EINTR);
270 if (ret_len < 0) {
271 fprintf(stderr, "[error] Error sending cmd\n");
272 ret = ret_len;
273 goto error;
274 }
275 assert(ret_len == sizeof(cmd));
276
277 do {
278 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
279 } while (ret_len < 0 && errno == EINTR);
280 if (ret_len < 0) {
281 fprintf(stderr, "[error] Error sending attach request\n");
282 ret = ret_len;
283 goto error;
284 }
285 assert(ret_len == sizeof(rq));
286
287 do {
288 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
289 } while (ret_len < 0 && errno == EINTR);
290 if (ret_len < 0) {
291 fprintf(stderr, "[error] Error receiving attach response\n");
292 ret = ret_len;
293 goto error;
294 }
295 assert(ret_len == sizeof(rp));
296
297 switch(be32toh(rp.status)) {
298 case LTTNG_VIEWER_ATTACH_OK:
299 break;
300 case LTTNG_VIEWER_ATTACH_UNK:
301 ret = -LTTNG_VIEWER_ATTACH_UNK;
302 goto end;
303 case LTTNG_VIEWER_ATTACH_ALREADY:
304 fprintf(stderr, "[error] Already a viewer attached\n");
305 ret = -1;
306 goto end;
307 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
308 fprintf(stderr, "[error] Not a live session\n");
309 ret = -1;
310 goto end;
311 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
312 fprintf(stderr, "[error] Wrong seek parameter\n");
313 ret = -1;
314 goto end;
315 default:
316 fprintf(stderr, "[error] Unknown attach return code %u\n",
317 be32toh(rp.status));
318 ret = -1;
319 goto end;
320 }
321 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
322 ret = -1;
323 goto end;
324 }
325
326 ctx->session->stream_count = be32toh(rp.streams_count);
327 /*
328 * When the session is created but not started, we do an active wait
329 * until it starts. It allows the viewer to start processing the trace
330 * as soon as the session starts.
331 */
332 if (ctx->session->stream_count == 0) {
333 ret = 0;
334 goto end;
335 }
336 printf_verbose("Waiting for %" PRIu64 " streams:\n",
337 ctx->session->stream_count);
338 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
339 ctx->session->stream_count);
4a744367
JD
340 for (i = 0; i < be32toh(rp.streams_count); i++) {
341 do {
342 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
343 } while (ret_len < 0 && errno == EINTR);
344 if (ret_len < 0) {
345 fprintf(stderr, "[error] Error receiving stream\n");
346 ret = ret_len;
347 goto error;
348 }
349 assert(ret_len == sizeof(stream));
9ed31e53
MD
350 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
351 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
4a744367
JD
352
353 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
354 be64toh(stream.id), stream.path_name,
355 stream.channel_name);
356 ctx->session->streams[i].id = be64toh(stream.id);
357 ctx->session->streams[i].session = ctx->session;
358
359 ctx->session->streams[i].first_read = 1;
360 ctx->session->streams[i].mmap_size = 0;
361
362 if (be32toh(stream.metadata_flag)) {
363 char *path;
364
365 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
6a4d2b5a
MD
366 if (!path) {
367 perror("strdup");
368 ret = -1;
369 goto error;
370 }
371 if (!mkdtemp(path)) {
372 perror("mkdtemp");
373 free(path);
374 ret = -1;
375 goto error;
376 }
4a744367 377 ctx->session->streams[i].metadata_flag = 1;
4a744367
JD
378 snprintf(ctx->session->streams[i].path,
379 sizeof(ctx->session->streams[i].path),
380 "%s/%s", path,
381 stream.channel_name);
382 ret = open(ctx->session->streams[i].path,
383 O_WRONLY | O_CREAT | O_TRUNC,
384 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
385 if (ret < 0) {
ab9d07f0 386 perror("open");
6a4d2b5a 387 free(path);
4a744367
JD
388 goto error;
389 }
390 ctx->session->streams[i].fd = ret;
6a4d2b5a 391 free(path);
4a744367
JD
392 }
393 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
394 be64toh(stream.ctf_trace_id));
395 if (ret < 0) {
396 goto error;
397 }
398
399 }
400 ret = 0;
401
402end:
403error:
404 return ret;
405}
406
407static
408int get_data_packet(struct lttng_live_ctx *ctx,
409 struct ctf_stream_pos *pos,
410 struct lttng_live_viewer_stream *stream, uint64_t offset,
411 uint64_t len)
412{
413 struct lttng_viewer_cmd cmd;
414 struct lttng_viewer_get_packet rq;
415 struct lttng_viewer_trace_packet rp;
416 ssize_t ret_len;
417 int ret;
418
419 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
420 cmd.data_size = sizeof(rq);
421 cmd.cmd_version = 0;
422
423 memset(&rq, 0, sizeof(rq));
424 rq.stream_id = htobe64(stream->id);
425 /* Already in big endian. */
426 rq.offset = offset;
427 rq.len = htobe32(len);
428
429 do {
430 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
431 } while (ret_len < 0 && errno == EINTR);
432 if (ret_len < 0) {
433 fprintf(stderr, "[error] Error sending cmd\n");
434 ret = ret_len;
435 goto error;
436 }
437 assert(ret_len == sizeof(cmd));
438
439 do {
440 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
441 } while (ret_len < 0 && errno == EINTR);
442 if (ret_len < 0) {
443 fprintf(stderr, "[error] Error sending get_data_packet request\n");
444 ret = ret_len;
445 goto error;
446 }
447 assert(ret_len == sizeof(rq));
448
449 do {
450 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
451 } while (ret_len < 0 && errno == EINTR);
452 if (ret_len < 0) {
453 fprintf(stderr, "[error] Error receiving data response\n");
454 ret = ret_len;
455 goto error;
456 }
457 assert(ret_len == sizeof(rp));
458
459 rp.flags = be32toh(rp.flags);
460
461 switch (be32toh(rp.status)) {
462 case LTTNG_VIEWER_GET_PACKET_OK:
463 len = be32toh(rp.len);
464 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
465 "\n", len);
466 break;
467 case LTTNG_VIEWER_GET_PACKET_RETRY:
468 printf_verbose("get_data_packet: retry\n");
469 ret = -1;
470 goto end;
471 case LTTNG_VIEWER_GET_PACKET_ERR:
472 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
473 printf_verbose("get_data_packet: new metadata needed\n");
474 ret = 0;
475 goto end;
476 }
477 fprintf(stderr, "[error] get_data_packet: error\n");
478 ret = -1;
479 goto end;
480 case LTTNG_VIEWER_GET_PACKET_EOF:
481 ret = -2;
482 goto error;
483 default:
484 printf_verbose("get_data_packet: unknown\n");
485 assert(0);
486 ret = -1;
487 goto end;
488 }
489
490 if (len <= 0) {
491 ret = -1;
492 goto end;
493 }
494
495 if (len > stream->mmap_size) {
496 uint64_t new_size;
497
498 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
499 if (pos->base_mma) {
500 /* unmap old base */
501 ret = munmap_align(pos->base_mma);
502 if (ret) {
503 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
504 strerror(errno));
505 ret = -1;
506 goto error;
507 }
508 pos->base_mma = NULL;
509 }
510 pos->base_mma = mmap_align(new_size,
511 PROT_READ | PROT_WRITE,
512 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
513 if (pos->base_mma == MAP_FAILED) {
514 fprintf(stderr, "[error] mmap error %s.\n",
515 strerror(errno));
516 pos->base_mma = NULL;
517 ret = -1;
518 goto error;
519 }
520
521 stream->mmap_size = new_size;
522 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
523 stream->mmap_size);
524 }
525
526 do {
527 ret_len = recv(ctx->control_sock,
528 mmap_align_addr(pos->base_mma), len,
529 MSG_WAITALL);
530 } while (ret_len < 0 && errno == EINTR);
531 if (ret_len < 0) {
532 fprintf(stderr, "[error] Error receiving trace packet\n");
533 ret = ret_len;
534 goto error;
535 }
536 assert(ret_len == len);
537
538end:
539error:
540 return ret;
541}
542
543/*
544 * Return number of metadata bytes written or a negative value on error.
545 */
546static
547int get_new_metadata(struct lttng_live_ctx *ctx,
548 struct lttng_live_viewer_stream *viewer_stream,
549 uint64_t *metadata_len)
550{
551 uint64_t len = 0;
552 int ret;
553 struct lttng_viewer_cmd cmd;
554 struct lttng_viewer_get_metadata rq;
555 struct lttng_viewer_metadata_packet rp;
556 struct lttng_live_viewer_stream *metadata_stream;
557 char *data = NULL;
558 ssize_t ret_len;
559
560 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
561 cmd.data_size = sizeof(rq);
562 cmd.cmd_version = 0;
563
564 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
565 rq.stream_id = htobe64(metadata_stream->id);
566
567 do {
568 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
569 } while (ret_len < 0 && errno == EINTR);
570 if (ret_len < 0) {
571 fprintf(stderr, "[error] Error sending cmd\n");
572 ret = ret_len;
573 goto error;
574 }
575 assert(ret_len == sizeof(cmd));
576
577 do {
578 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
579 } while (ret_len < 0 && errno == EINTR);
580 if (ret_len < 0) {
581 fprintf(stderr, "[error] Error sending get_metadata request\n");
582 ret = ret_len;
583 goto error;
584 }
585 assert(ret_len == sizeof(rq));
586
587 do {
588 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
589 } while (ret_len < 0 && errno == EINTR);
590 if (ret_len < 0) {
591 fprintf(stderr, "[error] Error receiving metadata response\n");
592 ret = ret_len;
593 goto error;
594 }
595 assert(ret_len == sizeof(rp));
596
597 switch (be32toh(rp.status)) {
598 case LTTNG_VIEWER_METADATA_OK:
599 printf_verbose("get_metadata : OK\n");
600 break;
601 case LTTNG_VIEWER_NO_NEW_METADATA:
602 printf_verbose("get_metadata : NO NEW\n");
603 ret = -1;
604 goto end;
605 case LTTNG_VIEWER_METADATA_ERR:
606 printf_verbose("get_metadata : ERR\n");
607 ret = -1;
608 goto end;
609 default:
610 printf_verbose("get_metadata : UNKNOWN\n");
611 ret = -1;
612 goto end;
613 }
614
615 len = be64toh(rp.len);
616 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
617 if (len <= 0) {
618 ret = -1;
619 goto end;
620 }
621
622 data = zmalloc(len);
623 if (!data) {
624 perror("relay data zmalloc");
625 ret = -1;
626 goto error;
627 }
628 do {
629 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
630 } while (ret_len < 0 && errno == EINTR);
631 if (ret_len < 0) {
632 fprintf(stderr, "[error] Error receiving trace packet\n");
633 ret = ret_len;
634 free(data);
635 goto error;
636 }
637 assert(ret_len == len);
638
639 do {
640 ret_len = write(metadata_stream->fd, data, len);
641 } while (ret_len < 0 && errno == EINTR);
642 if (ret_len < 0) {
643 free(data);
644 ret = ret_len;
645 goto error;
646 }
647 assert(ret_len == len);
648
649 free(data);
650
651 *metadata_len = len;
652 ret = 0;
653end:
654error:
655 return ret;
656}
657
658/*
659 * Get one index for a stream.
660 *
661 * Returns 0 on success or a negative value on error.
662 */
663static
664int get_next_index(struct lttng_live_ctx *ctx,
665 struct lttng_live_viewer_stream *viewer_stream,
666 struct packet_index *index)
667{
668 struct lttng_viewer_cmd cmd;
669 struct lttng_viewer_get_next_index rq;
670 struct lttng_viewer_index rp;
671 int ret;
672 uint64_t metadata_len;
673 ssize_t ret_len;
674
675 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
676 cmd.data_size = sizeof(rq);
677 cmd.cmd_version = 0;
678
679 memset(&rq, 0, sizeof(rq));
680 rq.stream_id = htobe64(viewer_stream->id);
681
682retry:
683 do {
684 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
685 } while (ret_len < 0 && errno == EINTR);
686 if (ret_len < 0) {
687 fprintf(stderr, "[error] Error sending cmd\n");
688 ret = ret_len;
689 goto error;
690 }
691 assert(ret_len == sizeof(cmd));
692
693 do {
694 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
695 } while (ret_len < 0 && errno == EINTR);
696 if (ret_len < 0) {
697 fprintf(stderr, "[error] Error sending get_next_index request\n");
698 ret = ret_len;
699 goto error;
700 }
701 assert(ret_len == sizeof(rq));
702
703 do {
704 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
705 } while (ret_len < 0 && errno == EINTR);
706 if (ret_len < 0) {
707 fprintf(stderr, "[error] Error receiving index response\n");
708 ret = ret_len;
709 goto error;
710 }
711 assert(ret_len == sizeof(rp));
712
713 rp.flags = be32toh(rp.flags);
714
715 switch (be32toh(rp.status)) {
716 case LTTNG_VIEWER_INDEX_INACTIVE:
717 printf_verbose("get_next_index: inactive\n");
718 memset(index, 0, sizeof(struct packet_index));
719 index->timestamp_end = be64toh(rp.timestamp_end);
720 break;
721 case LTTNG_VIEWER_INDEX_OK:
722 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
723 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
724 index->offset = be64toh(rp.offset);
725 index->packet_size = be64toh(rp.packet_size);
726 index->content_size = be64toh(rp.content_size);
727 index->timestamp_begin = be64toh(rp.timestamp_begin);
728 index->timestamp_end = be64toh(rp.timestamp_end);
729 index->events_discarded = be64toh(rp.events_discarded);
730
731 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
732 printf_verbose("get_next_index: new metadata needed\n");
733 ret = get_new_metadata(ctx, viewer_stream,
734 &metadata_len);
735 if (ret < 0) {
736 goto error;
737 }
738 }
739 break;
740 case LTTNG_VIEWER_INDEX_RETRY:
741 printf_verbose("get_next_index: retry\n");
742 sleep(1);
743 goto retry;
744 case LTTNG_VIEWER_INDEX_HUP:
745 printf_verbose("get_next_index: stream hung up\n");
746 viewer_stream->id = -1ULL;
747 viewer_stream->fd = -1;
748 index->offset = EOF;
749 break;
750 case LTTNG_VIEWER_INDEX_ERR:
751 fprintf(stderr, "[error] get_next_index: error\n");
752 ret = -1;
753 goto error;
754 default:
755 fprintf(stderr, "[error] get_next_index: unkwown value\n");
756 ret = -1;
757 goto error;
758 }
759
760 ret = 0;
761
762error:
763 return ret;
764}
765
766void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
767 int whence)
768{
769 struct ctf_stream_pos *pos;
770 struct ctf_file_stream *file_stream;
771 struct packet_index packet_index;
772 struct lttng_live_viewer_stream *viewer_stream;
773 struct lttng_live_session *session;
774 int ret;
775
776retry:
777 pos = ctf_pos(stream_pos);
778 file_stream = container_of(pos, struct ctf_file_stream, pos);
779 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
780 session = viewer_stream->session;
781
782 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
783 ret = get_next_index(session->ctx, viewer_stream, &packet_index);
784 if (ret < 0) {
785 pos->offset = EOF;
786 fprintf(stderr, "[error] get_next_index failed\n");
787 return;
788 }
789
790 pos->packet_size = packet_index.packet_size;
791 pos->content_size = packet_index.content_size;
792 pos->mmap_base_offset = 0;
793 if (packet_index.offset == EOF) {
794 pos->offset = EOF;
795 } else {
796 pos->offset = 0;
797 }
798
799 if (packet_index.content_size == 0) {
800 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
801 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
802 &file_stream->parent, packet_index.timestamp_end);
803 } else {
804 file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
805 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
806 &file_stream->parent, packet_index.timestamp_begin);
807 }
808
809 if (pos->packet_size == 0 || pos->offset == EOF) {
810 goto end;
811 }
812
813 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
814 viewer_stream->id);
815 ret = get_data_packet(session->ctx, pos, viewer_stream,
816 be64toh(packet_index.offset),
817 packet_index.packet_size / CHAR_BIT);
818 if (ret == -2) {
819 goto retry;
820 } else if (ret < 0) {
821 pos->offset = EOF;
822 fprintf(stderr, "[error] get_data_packet failed\n");
823 return;
824 }
825
826 printf_verbose("Index received : packet_size : %" PRIu64
827 ", offset %" PRIu64 ", content_size %" PRIu64
828 ", timestamp_end : %" PRIu64 "\n",
829 packet_index.packet_size, packet_index.offset,
830 packet_index.content_size, packet_index.timestamp_end);
831
832 /* update trace_packet_header and stream_packet_context */
833 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
834 /* Read packet header */
835 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
836 if (ret) {
837 pos->offset = EOF;
838 fprintf(stderr, "[error] trace packet header read failed\n");
839 goto end;
840 }
841 }
842 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
843 /* Read packet context */
844 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
845 if (ret) {
846 pos->offset = EOF;
847 fprintf(stderr, "[error] stream packet context read failed\n");
848 goto end;
849 }
850 }
851 pos->data_offset = pos->offset;
852
853end:
854 return;
855}
856
857static int del_traces(gpointer key, gpointer value, gpointer user_data)
858{
859 struct bt_context *bt_ctx = user_data;
860 struct lttng_live_ctf_trace *trace = value;
861 int ret;
862
863 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
864 if (ret < 0)
865 fprintf(stderr, "[error] removing trace from context\n");
866
867 /* remove the key/value pair from the HT. */
868 return 1;
869}
870
871static void add_traces(gpointer key, gpointer value, gpointer user_data)
872{
873 int i, ret, total_metadata = 0;
874 uint64_t metadata_len;
875 struct bt_context *bt_ctx = user_data;
876 struct lttng_live_ctf_trace *trace = value;
877 struct lttng_live_viewer_stream *stream;
878 struct bt_mmap_stream *new_mmap_stream;
879 struct bt_mmap_stream_list mmap_list;
880 struct lttng_live_ctx *ctx = NULL;
881
882 BT_INIT_LIST_HEAD(&mmap_list.head);
883
884 for (i = 0; i < trace->streams->len; i++) {
885 stream = g_ptr_array_index(trace->streams, i);
886 ctx = stream->session->ctx;
887
888 if (!stream->metadata_flag) {
889 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
890 new_mmap_stream->priv = (void *) stream;
891 new_mmap_stream->fd = -1;
892 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
893 } else {
894 /* Get all possible metadata before starting */
895 do {
896 ret = get_new_metadata(ctx, stream,
897 &metadata_len);
898 if (ret == 0) {
899 total_metadata += metadata_len;
900 }
901 } while (ret == 0 || total_metadata == 0);
902 trace->metadata_fp = fopen(stream->path, "r");
903 }
904 }
905
906 if (!trace->metadata_fp) {
907 fprintf(stderr, "[error] No metadata stream opened\n");
908 goto end_free;
909 }
910
911 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
912 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
913 if (ret < 0) {
914 fprintf(stderr, "[error] Error adding trace\n");
915 goto end_free;
916 }
917 trace->trace_id = ret;
918
919 goto end;
920
921end_free:
922 bt_context_put(bt_ctx);
923end:
924 return;
925}
926
927void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
928{
929 int ret, active_session = 0;
930 struct bt_context *bt_ctx;
931 struct bt_ctf_iter *iter;
932 const struct bt_ctf_event *event;
933 struct bt_iter_pos begin_pos;
934 struct bt_trace_descriptor *td_write;
935 struct bt_format *fmt_write;
936 struct ctf_text_stream_pos *sout;
937
938 bt_ctx = bt_context_create();
939 if (!bt_ctx) {
940 fprintf(stderr, "[error] bt_context_create allocation\n");
941 goto end;
942 }
943
944 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
945 if (!fmt_write) {
946 fprintf(stderr, "[error] ctf-text error\n");
947 goto end;
948 }
949
950 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
951 if (!td_write) {
952 fprintf(stderr, "[error] Error opening output trace\n");
953 goto end_free;
954 }
955
956 sout = container_of(td_write, struct ctf_text_stream_pos,
957 trace_descriptor);
958 if (!sout->parent.event_cb)
959 goto end_free;
960
961 /*
962 * As long as the session is active, we try to reattach to it,
963 * even if all the streams get closed.
964 */
965 do {
966 int flags;
967
968 do {
969 ret = lttng_live_attach_session(ctx, session_id);
970 printf_verbose("Attaching session returns %d\n", ret);
971 if (ret < 0) {
972 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
973 if (active_session)
974 goto end_free;
975 fprintf(stderr, "[error] Unknown "
976 "session ID\n");
977 }
978 goto end_free;
979 } else {
980 active_session = 1;
981 }
982 } while (ctx->session->stream_count == 0);
983
984 g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
985
986 begin_pos.type = BT_SEEK_BEGIN;
987 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
988 if (!iter) {
989 fprintf(stderr, "[error] Iterator creation error\n");
990 goto end;
991 }
992 for (;;) {
993 event = bt_ctf_iter_read_event_flags(iter, &flags);
994 if (!(flags & BT_ITER_FLAG_RETRY)) {
995 if (!event) {
996 /* End of trace */
997 break;
998 }
999 ret = sout->parent.event_cb(&sout->parent,
1000 event->parent->stream);
1001 if (ret) {
1002 fprintf(stderr, "[error] Writing "
1003 "event failed.\n");
1004 goto end_free;
1005 }
1006 }
1007 ret = bt_iter_next(bt_ctf_get_iter(iter));
1008 if (ret < 0) {
1009 goto end_free;
1010 }
1011 }
1012 bt_ctf_iter_destroy(iter);
1013 g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
1014 } while (active_session);
1015
1016end_free:
1017 bt_context_put(bt_ctx);
1018end:
1019 return;
1020}
This page took 0.059633 seconds and 4 git commands to generate.