Fix: lttng-live uninitialized scalar variable
[babeltrace.git] / formats / lttng-live / lttng-live-functions.c
CommitLineData
4a744367
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <netinet/in.h>
27#include <netdb.h>
28#include <stdio.h>
29#include <string.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <errno.h>
33#include <inttypes.h>
34#include <fcntl.h>
35#include <sys/mman.h>
36
37#include <babeltrace/ctf/ctf-index.h>
38
39#include <babeltrace/babeltrace.h>
40#include <babeltrace/ctf/events.h>
41#include <babeltrace/ctf/callbacks.h>
42#include <babeltrace/ctf/iterator.h>
43
44/* for packet_index */
45#include <babeltrace/ctf/types.h>
46
47#include <babeltrace/ctf/metadata.h>
48#include <babeltrace/ctf-text/types.h>
49#include <babeltrace/ctf/events-internal.h>
50#include <formats/ctf/events-private.h>
51
52#include "lttng-live-functions.h"
53#include "lttng-viewer.h"
54
55/*
56 * Memory allocation zeroed
57 */
58#define zmalloc(x) calloc(1, x)
59
60#ifndef max_t
61#define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63#endif
64
65int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
66 int port)
67{
68 struct hostent *host;
69 struct sockaddr_in server_addr;
70 int ret;
71
72 host = gethostbyname(hostname);
73 if (!host) {
74 ret = -1;
75 goto end;
76 }
77
78 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
79 perror("Socket");
80 ret = -1;
81 goto end;
82 }
83
84 server_addr.sin_family = AF_INET;
85 server_addr.sin_port = htons(port);
86 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
87 bzero(&(server_addr.sin_zero), 8);
88
89 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
90 sizeof(struct sockaddr)) == -1) {
91 perror("Connect");
92 ret = -1;
93 goto end;
94 }
95
96 ret = 0;
97
98end:
99 return ret;
100}
101
102int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
103{
104 struct lttng_viewer_cmd cmd;
105 struct lttng_viewer_connect connect;
106 int ret;
107 ssize_t ret_len;
108
109 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
110 cmd.data_size = sizeof(connect);
111 cmd.cmd_version = 0;
112
0c3cd7e1 113 connect.viewer_session_id = -1ULL; /* will be set on recv */
4a744367
JD
114 connect.major = htobe32(LTTNG_LIVE_MAJOR);
115 connect.minor = htobe32(LTTNG_LIVE_MINOR);
116 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
117
118 do {
119 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
120 } while (ret_len < 0 && errno == EINTR);
121 if (ret_len < 0) {
122 fprintf(stderr, "[error] Error sending cmd\n");
123 ret = ret_len;
124 goto error;
125 }
126 assert(ret_len == sizeof(cmd));
127
128 do {
129 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
130 } while (ret_len < 0 && errno == EINTR);
131 if (ret_len < 0) {
132 fprintf(stderr, "[error] Error sending version\n");
133 ret = ret_len;
134 goto error;
135 }
136 assert(ret_len == sizeof(connect));
137
138 do {
139 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
140 } while (ret_len < 0 && errno == EINTR);
141 if (ret_len < 0) {
142 fprintf(stderr, "[error] Error receiving version\n");
143 ret = ret_len;
144 goto error;
145 }
146 assert(ret_len == sizeof(connect));
147
148 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
149 be64toh(connect.viewer_session_id));
150 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
151 be32toh(connect.minor));
152
153 ret = 0;
154
155error:
156 return ret;
157}
158
159int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
160{
161 struct lttng_viewer_cmd cmd;
162 struct lttng_viewer_list_sessions list;
163 struct lttng_viewer_session lsession;
164 int i, ret;
165 ssize_t ret_len;
166 int sessions_count;
167
168 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
169 cmd.data_size = 0;
170 cmd.cmd_version = 0;
171
172 do {
173 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
174 } while (ret_len < 0 && errno == EINTR);
175 if (ret_len < 0) {
176 fprintf(stderr, "[error] Error sending cmd\n");
177 ret = ret_len;
178 goto error;
179 }
180 assert(ret_len == sizeof(cmd));
181
182 do {
183 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
184 } while (ret_len < 0 && errno == EINTR);
185 if (ret_len < 0) {
186 fprintf(stderr, "[error] Error receiving session list\n");
187 ret = ret_len;
188 goto error;
189 }
190 assert(ret_len == sizeof(list));
191
192 sessions_count = be32toh(list.sessions_count);
193 fprintf(stdout, "%u active session(s)%c\n", sessions_count,
194 sessions_count > 0 ? ':' : ' ');
195 for (i = 0; i < sessions_count; i++) {
196 do {
197 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
198 } while (ret_len < 0 && errno == EINTR);
199 if (ret_len < 0) {
200 fprintf(stderr, "[error] Error receiving session\n");
201 ret = ret_len;
202 goto error;
203 }
204 assert(ret_len == sizeof(lsession));
9ed31e53
MD
205 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
206 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
4a744367
JD
207
208 fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
209 "%u stream(s), %u client(s) connected)\n",
210 path, be64toh(lsession.id),
211 lsession.session_name, lsession.hostname,
212 be32toh(lsession.live_timer),
213 be32toh(lsession.streams),
214 be32toh(lsession.clients));
215 }
216
217 ret = 0;
218
219error:
220 return ret;
221}
222
223int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
224 uint64_t ctf_trace_id)
225{
226 struct lttng_live_ctf_trace *trace;
227 int ret = 0;
228
229 trace = g_hash_table_lookup(stream->session->ctf_traces,
230 (gpointer) ctf_trace_id);
231 if (!trace) {
232 trace = g_new0(struct lttng_live_ctf_trace, 1);
233 if (!trace) {
234 ret = -1;
235 fprintf(stderr, "[error] ctf_trace allocation\n");
236 goto error;
237 }
238 trace->ctf_trace_id = ctf_trace_id;
239 trace->streams = g_ptr_array_new();
240 g_hash_table_insert(stream->session->ctf_traces,
241 (gpointer) ctf_trace_id,
242 trace);
243 }
244 if (stream->metadata_flag)
245 trace->metadata_stream = stream;
246
247 stream->ctf_trace = trace;
248 g_ptr_array_add(trace->streams, stream);
249
250error:
251 return ret;
252}
253
254int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
255{
256 struct lttng_viewer_cmd cmd;
257 struct lttng_viewer_attach_session_request rq;
258 struct lttng_viewer_attach_session_response rp;
259 struct lttng_viewer_stream stream;
260 int ret, i;
261 ssize_t ret_len;
262
263 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
264 cmd.data_size = sizeof(rq);
265 cmd.cmd_version = 0;
266
267 memset(&rq, 0, sizeof(rq));
268 rq.session_id = htobe64(id);
269 // TODO: add cmd line parameter to select seek beginning
270 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
271 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
272
273 do {
274 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
275 } while (ret_len < 0 && errno == EINTR);
276 if (ret_len < 0) {
277 fprintf(stderr, "[error] Error sending cmd\n");
278 ret = ret_len;
279 goto error;
280 }
281 assert(ret_len == sizeof(cmd));
282
283 do {
284 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
285 } while (ret_len < 0 && errno == EINTR);
286 if (ret_len < 0) {
287 fprintf(stderr, "[error] Error sending attach request\n");
288 ret = ret_len;
289 goto error;
290 }
291 assert(ret_len == sizeof(rq));
292
293 do {
294 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
295 } while (ret_len < 0 && errno == EINTR);
296 if (ret_len < 0) {
297 fprintf(stderr, "[error] Error receiving attach response\n");
298 ret = ret_len;
299 goto error;
300 }
301 assert(ret_len == sizeof(rp));
302
303 switch(be32toh(rp.status)) {
304 case LTTNG_VIEWER_ATTACH_OK:
305 break;
306 case LTTNG_VIEWER_ATTACH_UNK:
307 ret = -LTTNG_VIEWER_ATTACH_UNK;
308 goto end;
309 case LTTNG_VIEWER_ATTACH_ALREADY:
310 fprintf(stderr, "[error] Already a viewer attached\n");
311 ret = -1;
312 goto end;
313 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
314 fprintf(stderr, "[error] Not a live session\n");
315 ret = -1;
316 goto end;
317 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
318 fprintf(stderr, "[error] Wrong seek parameter\n");
319 ret = -1;
320 goto end;
321 default:
322 fprintf(stderr, "[error] Unknown attach return code %u\n",
323 be32toh(rp.status));
324 ret = -1;
325 goto end;
326 }
327 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
328 ret = -1;
329 goto end;
330 }
331
332 ctx->session->stream_count = be32toh(rp.streams_count);
333 /*
334 * When the session is created but not started, we do an active wait
335 * until it starts. It allows the viewer to start processing the trace
336 * as soon as the session starts.
337 */
338 if (ctx->session->stream_count == 0) {
339 ret = 0;
340 goto end;
341 }
342 printf_verbose("Waiting for %" PRIu64 " streams:\n",
343 ctx->session->stream_count);
344 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
345 ctx->session->stream_count);
346 if (!ctx->session->streams) {
347 ret = -1;
348 goto error;
349 }
350
351 for (i = 0; i < be32toh(rp.streams_count); i++) {
352 do {
353 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
354 } while (ret_len < 0 && errno == EINTR);
355 if (ret_len < 0) {
356 fprintf(stderr, "[error] Error receiving stream\n");
357 ret = ret_len;
358 goto error;
359 }
360 assert(ret_len == sizeof(stream));
9ed31e53
MD
361 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
362 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
4a744367
JD
363
364 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
365 be64toh(stream.id), stream.path_name,
366 stream.channel_name);
367 ctx->session->streams[i].id = be64toh(stream.id);
368 ctx->session->streams[i].session = ctx->session;
369
370 ctx->session->streams[i].first_read = 1;
371 ctx->session->streams[i].mmap_size = 0;
372
373 if (be32toh(stream.metadata_flag)) {
374 char *path;
375
376 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
377 path = mkdtemp(path);
378 ctx->session->streams[i].metadata_flag = 1;
4a744367
JD
379 snprintf(ctx->session->streams[i].path,
380 sizeof(ctx->session->streams[i].path),
381 "%s/%s", path,
382 stream.channel_name);
383 ret = open(ctx->session->streams[i].path,
384 O_WRONLY | O_CREAT | O_TRUNC,
385 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
386 if (ret < 0) {
ab9d07f0 387 perror("open");
4a744367
JD
388 goto error;
389 }
390 ctx->session->streams[i].fd = ret;
391 }
392 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
393 be64toh(stream.ctf_trace_id));
394 if (ret < 0) {
395 goto error;
396 }
397
398 }
399 ret = 0;
400
401end:
402error:
403 return ret;
404}
405
406static
407int get_data_packet(struct lttng_live_ctx *ctx,
408 struct ctf_stream_pos *pos,
409 struct lttng_live_viewer_stream *stream, uint64_t offset,
410 uint64_t len)
411{
412 struct lttng_viewer_cmd cmd;
413 struct lttng_viewer_get_packet rq;
414 struct lttng_viewer_trace_packet rp;
415 ssize_t ret_len;
416 int ret;
417
418 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
419 cmd.data_size = sizeof(rq);
420 cmd.cmd_version = 0;
421
422 memset(&rq, 0, sizeof(rq));
423 rq.stream_id = htobe64(stream->id);
424 /* Already in big endian. */
425 rq.offset = offset;
426 rq.len = htobe32(len);
427
428 do {
429 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
430 } while (ret_len < 0 && errno == EINTR);
431 if (ret_len < 0) {
432 fprintf(stderr, "[error] Error sending cmd\n");
433 ret = ret_len;
434 goto error;
435 }
436 assert(ret_len == sizeof(cmd));
437
438 do {
439 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
440 } while (ret_len < 0 && errno == EINTR);
441 if (ret_len < 0) {
442 fprintf(stderr, "[error] Error sending get_data_packet request\n");
443 ret = ret_len;
444 goto error;
445 }
446 assert(ret_len == sizeof(rq));
447
448 do {
449 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
450 } while (ret_len < 0 && errno == EINTR);
451 if (ret_len < 0) {
452 fprintf(stderr, "[error] Error receiving data response\n");
453 ret = ret_len;
454 goto error;
455 }
456 assert(ret_len == sizeof(rp));
457
458 rp.flags = be32toh(rp.flags);
459
460 switch (be32toh(rp.status)) {
461 case LTTNG_VIEWER_GET_PACKET_OK:
462 len = be32toh(rp.len);
463 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
464 "\n", len);
465 break;
466 case LTTNG_VIEWER_GET_PACKET_RETRY:
467 printf_verbose("get_data_packet: retry\n");
468 ret = -1;
469 goto end;
470 case LTTNG_VIEWER_GET_PACKET_ERR:
471 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
472 printf_verbose("get_data_packet: new metadata needed\n");
473 ret = 0;
474 goto end;
475 }
476 fprintf(stderr, "[error] get_data_packet: error\n");
477 ret = -1;
478 goto end;
479 case LTTNG_VIEWER_GET_PACKET_EOF:
480 ret = -2;
481 goto error;
482 default:
483 printf_verbose("get_data_packet: unknown\n");
484 assert(0);
485 ret = -1;
486 goto end;
487 }
488
489 if (len <= 0) {
490 ret = -1;
491 goto end;
492 }
493
494 if (len > stream->mmap_size) {
495 uint64_t new_size;
496
497 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
498 if (pos->base_mma) {
499 /* unmap old base */
500 ret = munmap_align(pos->base_mma);
501 if (ret) {
502 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
503 strerror(errno));
504 ret = -1;
505 goto error;
506 }
507 pos->base_mma = NULL;
508 }
509 pos->base_mma = mmap_align(new_size,
510 PROT_READ | PROT_WRITE,
511 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
512 if (pos->base_mma == MAP_FAILED) {
513 fprintf(stderr, "[error] mmap error %s.\n",
514 strerror(errno));
515 pos->base_mma = NULL;
516 ret = -1;
517 goto error;
518 }
519
520 stream->mmap_size = new_size;
521 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
522 stream->mmap_size);
523 }
524
525 do {
526 ret_len = recv(ctx->control_sock,
527 mmap_align_addr(pos->base_mma), len,
528 MSG_WAITALL);
529 } while (ret_len < 0 && errno == EINTR);
530 if (ret_len < 0) {
531 fprintf(stderr, "[error] Error receiving trace packet\n");
532 ret = ret_len;
533 goto error;
534 }
535 assert(ret_len == len);
536
537end:
538error:
539 return ret;
540}
541
542/*
543 * Return number of metadata bytes written or a negative value on error.
544 */
545static
546int get_new_metadata(struct lttng_live_ctx *ctx,
547 struct lttng_live_viewer_stream *viewer_stream,
548 uint64_t *metadata_len)
549{
550 uint64_t len = 0;
551 int ret;
552 struct lttng_viewer_cmd cmd;
553 struct lttng_viewer_get_metadata rq;
554 struct lttng_viewer_metadata_packet rp;
555 struct lttng_live_viewer_stream *metadata_stream;
556 char *data = NULL;
557 ssize_t ret_len;
558
559 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
560 cmd.data_size = sizeof(rq);
561 cmd.cmd_version = 0;
562
563 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
564 rq.stream_id = htobe64(metadata_stream->id);
565
566 do {
567 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
568 } while (ret_len < 0 && errno == EINTR);
569 if (ret_len < 0) {
570 fprintf(stderr, "[error] Error sending cmd\n");
571 ret = ret_len;
572 goto error;
573 }
574 assert(ret_len == sizeof(cmd));
575
576 do {
577 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
578 } while (ret_len < 0 && errno == EINTR);
579 if (ret_len < 0) {
580 fprintf(stderr, "[error] Error sending get_metadata request\n");
581 ret = ret_len;
582 goto error;
583 }
584 assert(ret_len == sizeof(rq));
585
586 do {
587 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
588 } while (ret_len < 0 && errno == EINTR);
589 if (ret_len < 0) {
590 fprintf(stderr, "[error] Error receiving metadata response\n");
591 ret = ret_len;
592 goto error;
593 }
594 assert(ret_len == sizeof(rp));
595
596 switch (be32toh(rp.status)) {
597 case LTTNG_VIEWER_METADATA_OK:
598 printf_verbose("get_metadata : OK\n");
599 break;
600 case LTTNG_VIEWER_NO_NEW_METADATA:
601 printf_verbose("get_metadata : NO NEW\n");
602 ret = -1;
603 goto end;
604 case LTTNG_VIEWER_METADATA_ERR:
605 printf_verbose("get_metadata : ERR\n");
606 ret = -1;
607 goto end;
608 default:
609 printf_verbose("get_metadata : UNKNOWN\n");
610 ret = -1;
611 goto end;
612 }
613
614 len = be64toh(rp.len);
615 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
616 if (len <= 0) {
617 ret = -1;
618 goto end;
619 }
620
621 data = zmalloc(len);
622 if (!data) {
623 perror("relay data zmalloc");
624 ret = -1;
625 goto error;
626 }
627 do {
628 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
629 } while (ret_len < 0 && errno == EINTR);
630 if (ret_len < 0) {
631 fprintf(stderr, "[error] Error receiving trace packet\n");
632 ret = ret_len;
633 free(data);
634 goto error;
635 }
636 assert(ret_len == len);
637
638 do {
639 ret_len = write(metadata_stream->fd, data, len);
640 } while (ret_len < 0 && errno == EINTR);
641 if (ret_len < 0) {
642 free(data);
643 ret = ret_len;
644 goto error;
645 }
646 assert(ret_len == len);
647
648 free(data);
649
650 *metadata_len = len;
651 ret = 0;
652end:
653error:
654 return ret;
655}
656
657/*
658 * Get one index for a stream.
659 *
660 * Returns 0 on success or a negative value on error.
661 */
662static
663int get_next_index(struct lttng_live_ctx *ctx,
664 struct lttng_live_viewer_stream *viewer_stream,
665 struct packet_index *index)
666{
667 struct lttng_viewer_cmd cmd;
668 struct lttng_viewer_get_next_index rq;
669 struct lttng_viewer_index rp;
670 int ret;
671 uint64_t metadata_len;
672 ssize_t ret_len;
673
674 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
675 cmd.data_size = sizeof(rq);
676 cmd.cmd_version = 0;
677
678 memset(&rq, 0, sizeof(rq));
679 rq.stream_id = htobe64(viewer_stream->id);
680
681retry:
682 do {
683 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
684 } while (ret_len < 0 && errno == EINTR);
685 if (ret_len < 0) {
686 fprintf(stderr, "[error] Error sending cmd\n");
687 ret = ret_len;
688 goto error;
689 }
690 assert(ret_len == sizeof(cmd));
691
692 do {
693 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
694 } while (ret_len < 0 && errno == EINTR);
695 if (ret_len < 0) {
696 fprintf(stderr, "[error] Error sending get_next_index request\n");
697 ret = ret_len;
698 goto error;
699 }
700 assert(ret_len == sizeof(rq));
701
702 do {
703 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
704 } while (ret_len < 0 && errno == EINTR);
705 if (ret_len < 0) {
706 fprintf(stderr, "[error] Error receiving index response\n");
707 ret = ret_len;
708 goto error;
709 }
710 assert(ret_len == sizeof(rp));
711
712 rp.flags = be32toh(rp.flags);
713
714 switch (be32toh(rp.status)) {
715 case LTTNG_VIEWER_INDEX_INACTIVE:
716 printf_verbose("get_next_index: inactive\n");
717 memset(index, 0, sizeof(struct packet_index));
718 index->timestamp_end = be64toh(rp.timestamp_end);
719 break;
720 case LTTNG_VIEWER_INDEX_OK:
721 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
722 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
723 index->offset = be64toh(rp.offset);
724 index->packet_size = be64toh(rp.packet_size);
725 index->content_size = be64toh(rp.content_size);
726 index->timestamp_begin = be64toh(rp.timestamp_begin);
727 index->timestamp_end = be64toh(rp.timestamp_end);
728 index->events_discarded = be64toh(rp.events_discarded);
729
730 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
731 printf_verbose("get_next_index: new metadata needed\n");
732 ret = get_new_metadata(ctx, viewer_stream,
733 &metadata_len);
734 if (ret < 0) {
735 goto error;
736 }
737 }
738 break;
739 case LTTNG_VIEWER_INDEX_RETRY:
740 printf_verbose("get_next_index: retry\n");
741 sleep(1);
742 goto retry;
743 case LTTNG_VIEWER_INDEX_HUP:
744 printf_verbose("get_next_index: stream hung up\n");
745 viewer_stream->id = -1ULL;
746 viewer_stream->fd = -1;
747 index->offset = EOF;
748 break;
749 case LTTNG_VIEWER_INDEX_ERR:
750 fprintf(stderr, "[error] get_next_index: error\n");
751 ret = -1;
752 goto error;
753 default:
754 fprintf(stderr, "[error] get_next_index: unkwown value\n");
755 ret = -1;
756 goto error;
757 }
758
759 ret = 0;
760
761error:
762 return ret;
763}
764
765void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
766 int whence)
767{
768 struct ctf_stream_pos *pos;
769 struct ctf_file_stream *file_stream;
770 struct packet_index packet_index;
771 struct lttng_live_viewer_stream *viewer_stream;
772 struct lttng_live_session *session;
773 int ret;
774
775retry:
776 pos = ctf_pos(stream_pos);
777 file_stream = container_of(pos, struct ctf_file_stream, pos);
778 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
779 session = viewer_stream->session;
780
781 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
782 ret = get_next_index(session->ctx, viewer_stream, &packet_index);
783 if (ret < 0) {
784 pos->offset = EOF;
785 fprintf(stderr, "[error] get_next_index failed\n");
786 return;
787 }
788
789 pos->packet_size = packet_index.packet_size;
790 pos->content_size = packet_index.content_size;
791 pos->mmap_base_offset = 0;
792 if (packet_index.offset == EOF) {
793 pos->offset = EOF;
794 } else {
795 pos->offset = 0;
796 }
797
798 if (packet_index.content_size == 0) {
799 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
800 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
801 &file_stream->parent, packet_index.timestamp_end);
802 } else {
803 file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
804 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
805 &file_stream->parent, packet_index.timestamp_begin);
806 }
807
808 if (pos->packet_size == 0 || pos->offset == EOF) {
809 goto end;
810 }
811
812 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
813 viewer_stream->id);
814 ret = get_data_packet(session->ctx, pos, viewer_stream,
815 be64toh(packet_index.offset),
816 packet_index.packet_size / CHAR_BIT);
817 if (ret == -2) {
818 goto retry;
819 } else if (ret < 0) {
820 pos->offset = EOF;
821 fprintf(stderr, "[error] get_data_packet failed\n");
822 return;
823 }
824
825 printf_verbose("Index received : packet_size : %" PRIu64
826 ", offset %" PRIu64 ", content_size %" PRIu64
827 ", timestamp_end : %" PRIu64 "\n",
828 packet_index.packet_size, packet_index.offset,
829 packet_index.content_size, packet_index.timestamp_end);
830
831 /* update trace_packet_header and stream_packet_context */
832 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
833 /* Read packet header */
834 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
835 if (ret) {
836 pos->offset = EOF;
837 fprintf(stderr, "[error] trace packet header read failed\n");
838 goto end;
839 }
840 }
841 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
842 /* Read packet context */
843 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
844 if (ret) {
845 pos->offset = EOF;
846 fprintf(stderr, "[error] stream packet context read failed\n");
847 goto end;
848 }
849 }
850 pos->data_offset = pos->offset;
851
852end:
853 return;
854}
855
856static int del_traces(gpointer key, gpointer value, gpointer user_data)
857{
858 struct bt_context *bt_ctx = user_data;
859 struct lttng_live_ctf_trace *trace = value;
860 int ret;
861
862 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
863 if (ret < 0)
864 fprintf(stderr, "[error] removing trace from context\n");
865
866 /* remove the key/value pair from the HT. */
867 return 1;
868}
869
870static void add_traces(gpointer key, gpointer value, gpointer user_data)
871{
872 int i, ret, total_metadata = 0;
873 uint64_t metadata_len;
874 struct bt_context *bt_ctx = user_data;
875 struct lttng_live_ctf_trace *trace = value;
876 struct lttng_live_viewer_stream *stream;
877 struct bt_mmap_stream *new_mmap_stream;
878 struct bt_mmap_stream_list mmap_list;
879 struct lttng_live_ctx *ctx = NULL;
880
881 BT_INIT_LIST_HEAD(&mmap_list.head);
882
883 for (i = 0; i < trace->streams->len; i++) {
884 stream = g_ptr_array_index(trace->streams, i);
885 ctx = stream->session->ctx;
886
887 if (!stream->metadata_flag) {
888 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
889 new_mmap_stream->priv = (void *) stream;
890 new_mmap_stream->fd = -1;
891 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
892 } else {
893 /* Get all possible metadata before starting */
894 do {
895 ret = get_new_metadata(ctx, stream,
896 &metadata_len);
897 if (ret == 0) {
898 total_metadata += metadata_len;
899 }
900 } while (ret == 0 || total_metadata == 0);
901 trace->metadata_fp = fopen(stream->path, "r");
902 }
903 }
904
905 if (!trace->metadata_fp) {
906 fprintf(stderr, "[error] No metadata stream opened\n");
907 goto end_free;
908 }
909
910 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
911 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
912 if (ret < 0) {
913 fprintf(stderr, "[error] Error adding trace\n");
914 goto end_free;
915 }
916 trace->trace_id = ret;
917
918 goto end;
919
920end_free:
921 bt_context_put(bt_ctx);
922end:
923 return;
924}
925
926void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
927{
928 int ret, active_session = 0;
929 struct bt_context *bt_ctx;
930 struct bt_ctf_iter *iter;
931 const struct bt_ctf_event *event;
932 struct bt_iter_pos begin_pos;
933 struct bt_trace_descriptor *td_write;
934 struct bt_format *fmt_write;
935 struct ctf_text_stream_pos *sout;
936
937 bt_ctx = bt_context_create();
938 if (!bt_ctx) {
939 fprintf(stderr, "[error] bt_context_create allocation\n");
940 goto end;
941 }
942
943 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
944 if (!fmt_write) {
945 fprintf(stderr, "[error] ctf-text error\n");
946 goto end;
947 }
948
949 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
950 if (!td_write) {
951 fprintf(stderr, "[error] Error opening output trace\n");
952 goto end_free;
953 }
954
955 sout = container_of(td_write, struct ctf_text_stream_pos,
956 trace_descriptor);
957 if (!sout->parent.event_cb)
958 goto end_free;
959
960 /*
961 * As long as the session is active, we try to reattach to it,
962 * even if all the streams get closed.
963 */
964 do {
965 int flags;
966
967 do {
968 ret = lttng_live_attach_session(ctx, session_id);
969 printf_verbose("Attaching session returns %d\n", ret);
970 if (ret < 0) {
971 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
972 if (active_session)
973 goto end_free;
974 fprintf(stderr, "[error] Unknown "
975 "session ID\n");
976 }
977 goto end_free;
978 } else {
979 active_session = 1;
980 }
981 } while (ctx->session->stream_count == 0);
982
983 g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
984
985 begin_pos.type = BT_SEEK_BEGIN;
986 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
987 if (!iter) {
988 fprintf(stderr, "[error] Iterator creation error\n");
989 goto end;
990 }
991 for (;;) {
992 event = bt_ctf_iter_read_event_flags(iter, &flags);
993 if (!(flags & BT_ITER_FLAG_RETRY)) {
994 if (!event) {
995 /* End of trace */
996 break;
997 }
998 ret = sout->parent.event_cb(&sout->parent,
999 event->parent->stream);
1000 if (ret) {
1001 fprintf(stderr, "[error] Writing "
1002 "event failed.\n");
1003 goto end_free;
1004 }
1005 }
1006 ret = bt_iter_next(bt_ctf_get_iter(iter));
1007 if (ret < 0) {
1008 goto end_free;
1009 }
1010 }
1011 bt_ctf_iter_destroy(iter);
1012 g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
1013 } while (active_session);
1014
1015end_free:
1016 bt_context_put(bt_ctx);
1017end:
1018 return;
1019}
This page took 0.058789 seconds and 4 git commands to generate.