Fix: don't update the trace collection if no new streams were received
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36 #include <poll.h>
37
38 #include <babeltrace/ctf/ctf-index.h>
39
40 #include <babeltrace/babeltrace.h>
41 #include <babeltrace/ctf/events.h>
42 #include <babeltrace/ctf/callbacks.h>
43 #include <babeltrace/ctf/iterator.h>
44
45 /* for packet_index */
46 #include <babeltrace/ctf/types.h>
47
48 #include <babeltrace/ctf/metadata.h>
49 #include <babeltrace/ctf-text/types.h>
50 #include <babeltrace/ctf/events-internal.h>
51 #include <formats/ctf/events-private.h>
52
53 #include <babeltrace/endian.h>
54 #include <babeltrace/compat/memstream.h>
55
56 #include "lttng-live.h"
57 #include "lttng-viewer-abi.h"
58
59 #define ACTIVE_POLL_DELAY 100 /* ms */
60
61 /*
62 * Memory allocation zeroed
63 */
64 #define zmalloc(x) calloc(1, x)
65
66 #ifndef max_t
67 #define max_t(type, a, b) \
68 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
69 #endif
70
71 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
72 size_t index, int whence);
73 static void add_traces(gpointer key, gpointer value, gpointer user_data);
74 static int del_traces(gpointer key, gpointer value, gpointer user_data);
75 static int get_new_metadata(struct lttng_live_ctx *ctx,
76 struct lttng_live_viewer_stream *viewer_stream,
77 char **metadata_buf);
78
79 static
80 ssize_t lttng_live_recv(int fd, void *buf, size_t len)
81 {
82 ssize_t ret;
83 size_t copied = 0, to_copy = len;
84
85 do {
86 ret = recv(fd, buf + copied, to_copy, 0);
87 if (ret > 0) {
88 assert(ret <= to_copy);
89 copied += ret;
90 to_copy -= ret;
91 }
92 } while ((ret > 0 && to_copy > 0)
93 || (ret < 0 && errno == EINTR));
94 if (ret > 0)
95 ret = copied;
96 /* ret = 0 means orderly shutdown, ret < 0 is error. */
97 return ret;
98 }
99
100 static
101 ssize_t lttng_live_send(int fd, const void *buf, size_t len)
102 {
103 ssize_t ret;
104
105 do {
106 ret = send(fd, buf, len, MSG_NOSIGNAL);
107 } while (ret < 0 && errno == EINTR);
108 return ret;
109 }
110
111 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
112 {
113 struct hostent *host;
114 struct sockaddr_in server_addr;
115 int ret;
116
117 if (lttng_live_should_quit()) {
118 ret = -1;
119 goto end;
120 }
121
122 host = gethostbyname(ctx->relay_hostname);
123 if (!host) {
124 fprintf(stderr, "[error] Cannot lookup hostname %s\n",
125 ctx->relay_hostname);
126 goto error;
127 }
128
129 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
130 perror("Socket");
131 goto error;
132 }
133
134 server_addr.sin_family = AF_INET;
135 server_addr.sin_port = htons(ctx->port);
136 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
137 bzero(&(server_addr.sin_zero), 8);
138
139 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
140 sizeof(struct sockaddr)) == -1) {
141 perror("Connect");
142 goto error;
143 }
144
145 ret = 0;
146
147 end:
148 return ret;
149
150 error:
151 fprintf(stderr, "[error] Connection failed\n");
152 return -1;
153 }
154
155 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
156 {
157 struct lttng_viewer_cmd cmd;
158 struct lttng_viewer_connect connect;
159 int ret;
160 ssize_t ret_len;
161
162 if (lttng_live_should_quit()) {
163 ret = -1;
164 goto end;
165 }
166
167 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
168 cmd.data_size = sizeof(connect);
169 cmd.cmd_version = 0;
170
171 connect.viewer_session_id = -1ULL; /* will be set on recv */
172 connect.major = htobe32(LTTNG_LIVE_MAJOR);
173 connect.minor = htobe32(LTTNG_LIVE_MINOR);
174 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
175
176 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
177 if (ret_len < 0) {
178 perror("[error] Error sending cmd");
179 goto error;
180 }
181 assert(ret_len == sizeof(cmd));
182
183 ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect));
184 if (ret_len < 0) {
185 perror("[error] Error sending version");
186 goto error;
187 }
188 assert(ret_len == sizeof(connect));
189
190 ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect));
191 if (ret_len == 0) {
192 fprintf(stderr, "[error] Remote side has closed connection\n");
193 goto error;
194 }
195 if (ret_len < 0) {
196 perror("[error] Error receiving version");
197 goto error;
198 }
199 assert(ret_len == sizeof(connect));
200
201 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
202 be64toh(connect.viewer_session_id));
203 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
204 be32toh(connect.minor));
205 ret = 0;
206 end:
207 return ret;
208
209 error:
210 fprintf(stderr, "[error] Unable to establish connection\n");
211 return -1;
212 }
213
214 static
215 void free_session_list(GPtrArray *session_list)
216 {
217 int i;
218 struct lttng_live_relay_session *relay_session;
219
220 for (i = 0; i < session_list->len; i++) {
221 relay_session = g_ptr_array_index(session_list, i);
222 free(relay_session->name);
223 free(relay_session->hostname);
224 }
225 g_ptr_array_free(session_list, TRUE);
226 }
227
228 static
229 void print_session_list(GPtrArray *session_list, const char *path)
230 {
231 int i;
232 struct lttng_live_relay_session *relay_session;
233
234 for (i = 0; i < session_list->len; i++) {
235 relay_session = g_ptr_array_index(session_list, i);
236 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
237 "%u stream(s), %u client(s) connected)\n",
238 path, relay_session->hostname,
239 relay_session->name, relay_session->timer,
240 relay_session->streams, relay_session->clients);
241 }
242 }
243
244 static
245 void update_session_list(GPtrArray *session_list, char *hostname,
246 char *session_name, uint32_t streams, uint32_t clients,
247 uint32_t timer)
248 {
249 int i, found = 0;
250 struct lttng_live_relay_session *relay_session;
251
252 for (i = 0; i < session_list->len; i++) {
253 relay_session = g_ptr_array_index(session_list, i);
254 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
255 strncmp(relay_session->name,
256 session_name, NAME_MAX) == 0) {
257 relay_session->streams += streams;
258 if (relay_session->clients < clients)
259 relay_session->clients = clients;
260 found = 1;
261 break;
262 }
263 }
264 if (found)
265 return;
266
267 relay_session = g_new0(struct lttng_live_relay_session, 1);
268 relay_session->hostname = strndup(hostname, NAME_MAX);
269 relay_session->name = strndup(session_name, NAME_MAX);
270 relay_session->clients = clients;
271 relay_session->streams = streams;
272 relay_session->timer = timer;
273 g_ptr_array_add(session_list, relay_session);
274 }
275
276 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
277 {
278 struct lttng_viewer_cmd cmd;
279 struct lttng_viewer_list_sessions list;
280 struct lttng_viewer_session lsession;
281 int i, ret, sessions_count, print_list = 0;
282 ssize_t ret_len;
283 uint64_t session_id;
284 GPtrArray *session_list = NULL;
285
286 if (lttng_live_should_quit()) {
287 ret = -1;
288 goto end;
289 }
290
291 if (strlen(ctx->session_name) == 0) {
292 print_list = 1;
293 session_list = g_ptr_array_new();
294 }
295
296 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
297 cmd.data_size = 0;
298 cmd.cmd_version = 0;
299
300 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
301 if (ret_len < 0) {
302 perror("[error] Error sending cmd");
303 goto error;
304 }
305 assert(ret_len == sizeof(cmd));
306
307 ret_len = lttng_live_recv(ctx->control_sock, &list, sizeof(list));
308 if (ret_len == 0) {
309 fprintf(stderr, "[error] Remote side has closed connection\n");
310 goto error;
311 }
312 if (ret_len < 0) {
313 perror("[error] Error receiving session list");
314 goto error;
315 }
316 assert(ret_len == sizeof(list));
317
318 sessions_count = be32toh(list.sessions_count);
319 for (i = 0; i < sessions_count; i++) {
320 ret_len = lttng_live_recv(ctx->control_sock, &lsession, sizeof(lsession));
321 if (ret_len == 0) {
322 fprintf(stderr, "[error] Remote side has closed connection\n");
323 goto error;
324 }
325 if (ret_len < 0) {
326 perror("[error] Error receiving session");
327 goto error;
328 }
329 assert(ret_len == sizeof(lsession));
330 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
331 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
332 session_id = be64toh(lsession.id);
333
334 if (print_list) {
335 update_session_list(session_list,
336 lsession.hostname,
337 lsession.session_name,
338 be32toh(lsession.streams),
339 be32toh(lsession.clients),
340 be32toh(lsession.live_timer));
341 } else {
342 if ((strncmp(lsession.session_name, ctx->session_name,
343 NAME_MAX) == 0) && (strncmp(lsession.hostname,
344 ctx->traced_hostname, NAME_MAX) == 0)) {
345 printf_verbose("Reading from session %" PRIu64 "\n",
346 session_id);
347 g_array_append_val(ctx->session_ids,
348 session_id);
349 }
350 }
351 }
352
353 if (print_list) {
354 print_session_list(session_list, path);
355 free_session_list(session_list);
356 }
357 ret = 0;
358 end:
359 return ret;
360
361 error:
362 fprintf(stderr, "[error] Unable to list sessions\n");
363 return -1;
364 }
365
366 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
367 uint64_t ctf_trace_id)
368 {
369 struct lttng_live_ctf_trace *trace;
370 int ret = 0;
371
372 trace = g_hash_table_lookup(stream->session->ctf_traces,
373 (gpointer) ctf_trace_id);
374 if (!trace) {
375 trace = g_new0(struct lttng_live_ctf_trace, 1);
376 trace->ctf_trace_id = ctf_trace_id;
377 trace->streams = g_ptr_array_new();
378 g_hash_table_insert(stream->session->ctf_traces,
379 (gpointer) ctf_trace_id,
380 trace);
381 }
382 if (stream->metadata_flag)
383 trace->metadata_stream = stream;
384
385 stream->ctf_trace = trace;
386 g_ptr_array_add(trace->streams, stream);
387
388 return ret;
389 }
390
391 static
392 int open_metadata_fp_write(struct lttng_live_viewer_stream *stream,
393 char **metadata_buf, size_t *size)
394 {
395 int ret = 0;
396
397 stream->metadata_fp_write =
398 babeltrace_open_memstream(metadata_buf, size);
399 if (!stream->metadata_fp_write) {
400 perror("Metadata open_memstream");
401 ret = -1;
402 }
403
404 return ret;
405 }
406
407 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
408 {
409 struct lttng_viewer_cmd cmd;
410 struct lttng_viewer_attach_session_request rq;
411 struct lttng_viewer_attach_session_response rp;
412 struct lttng_viewer_stream stream;
413 int ret, i;
414 ssize_t ret_len;
415
416 if (lttng_live_should_quit()) {
417 ret = -1;
418 goto end;
419 }
420
421 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
422 cmd.data_size = sizeof(rq);
423 cmd.cmd_version = 0;
424
425 memset(&rq, 0, sizeof(rq));
426 rq.session_id = htobe64(id);
427 // TODO: add cmd line parameter to select seek beginning
428 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
429 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
430
431 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
432 if (ret_len < 0) {
433 perror("[error] Error sending cmd");
434 goto error;
435 }
436 assert(ret_len == sizeof(cmd));
437
438 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
439 if (ret_len < 0) {
440 perror("[error] Error sending attach request");
441 goto error;
442 }
443 assert(ret_len == sizeof(rq));
444
445 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
446 if (ret_len == 0) {
447 fprintf(stderr, "[error] Remote side has closed connection\n");
448 goto error;
449 }
450 if (ret_len < 0) {
451 perror("[error] Error receiving attach response");
452 goto error;
453 }
454 assert(ret_len == sizeof(rp));
455
456 switch(be32toh(rp.status)) {
457 case LTTNG_VIEWER_ATTACH_OK:
458 break;
459 case LTTNG_VIEWER_ATTACH_UNK:
460 ret = -LTTNG_VIEWER_ATTACH_UNK;
461 goto end;
462 case LTTNG_VIEWER_ATTACH_ALREADY:
463 fprintf(stderr, "[error] There is already a viewer attached to this session\n");
464 goto error;
465 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
466 fprintf(stderr, "[error] Not a live session\n");
467 goto error;
468 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
469 fprintf(stderr, "[error] Wrong seek parameter\n");
470 goto error;
471 default:
472 fprintf(stderr, "[error] Unknown attach return code %u\n",
473 be32toh(rp.status));
474 goto error;
475 }
476 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
477 goto error;
478 }
479
480 ctx->session->stream_count += be32toh(rp.streams_count);
481 /*
482 * When the session is created but not started, we do an active wait
483 * until it starts. It allows the viewer to start processing the trace
484 * as soon as the session starts.
485 */
486 if (ctx->session->stream_count == 0) {
487 ret = 0;
488 goto end;
489 }
490 printf_verbose("Waiting for %" PRIu64 " streams:\n",
491 ctx->session->stream_count);
492 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
493 ctx->session->stream_count);
494 for (i = 0; i < be32toh(rp.streams_count); i++) {
495 ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
496 if (ret_len == 0) {
497 fprintf(stderr, "[error] Remote side has closed connection\n");
498 goto error;
499 }
500 if (ret_len < 0) {
501 perror("[error] Error receiving stream");
502 goto error;
503 }
504 assert(ret_len == sizeof(stream));
505 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
506 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
507
508 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
509 be64toh(stream.id), stream.path_name,
510 stream.channel_name);
511 ctx->session->streams[i].id = be64toh(stream.id);
512 ctx->session->streams[i].session = ctx->session;
513
514 ctx->session->streams[i].first_read = 1;
515 ctx->session->streams[i].mmap_size = 0;
516
517 if (be32toh(stream.metadata_flag)) {
518 ctx->session->streams[i].metadata_flag = 1;
519 }
520 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
521 be64toh(stream.ctf_trace_id));
522 if (ret < 0) {
523 goto error;
524 }
525
526 }
527 ret = 0;
528 end:
529 return ret;
530
531 error:
532 return -1;
533 }
534
535 /*
536 * Ask the relay for new streams.
537 *
538 * Returns the number of new streams received or a negative value on error.
539 */
540 static
541 int ask_new_streams(struct lttng_live_ctx *ctx)
542 {
543 int i, ret = 0, nb_streams = 0;
544 uint64_t id;
545
546 restart:
547 for (i = 0; i < ctx->session_ids->len; i++) {
548 id = g_array_index(ctx->session_ids, uint64_t, i);
549 ret = lttng_live_get_new_streams(ctx, id);
550 printf_verbose("Asking for new streams returns %d\n", ret);
551 if (ret < 0) {
552 if (lttng_live_should_quit()) {
553 goto end;
554 }
555 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
556 printf_verbose("Session %" PRIu64 " closed\n",
557 id);
558 /*
559 * The streams have already been closed during
560 * the reading, so we only need to get rid of
561 * the trace in our internal table of sessions.
562 */
563 g_array_remove_index(ctx->session_ids, i);
564 /*
565 * We can't continue iterating on the g_array
566 * after a remove, we have to start again.
567 */
568 goto restart;
569 } else {
570 ret = -1;
571 goto end;
572 }
573 } else {
574 nb_streams += ret;
575 }
576 }
577 ret = nb_streams;
578
579 end:
580 return ret;
581 }
582
583 static
584 int append_metadata(struct lttng_live_ctx *ctx,
585 struct lttng_live_viewer_stream *viewer_stream)
586 {
587 int ret;
588 struct lttng_live_viewer_stream *metadata;
589 char *metadata_buf = NULL;
590
591 printf_verbose("get_next_index: new metadata needed\n");
592 ret = get_new_metadata(ctx, viewer_stream, &metadata_buf);
593 if (ret < 0) {
594 free(metadata_buf);
595 goto error;
596 }
597
598 metadata = viewer_stream->ctf_trace->metadata_stream;
599 metadata->ctf_trace->metadata_fp =
600 babeltrace_fmemopen(metadata_buf,
601 metadata->metadata_len, "rb");
602 if (!metadata->ctf_trace->metadata_fp) {
603 perror("Metadata fmemopen");
604 free(metadata_buf);
605 ret = -1;
606 goto error;
607 }
608 ret = ctf_append_trace_metadata(
609 viewer_stream->ctf_trace->handle->td,
610 metadata->ctf_trace->metadata_fp);
611 /* We accept empty metadata packets */
612 if (ret != 0 && ret != -ENOENT) {
613 fprintf(stderr, "[error] Appending metadata\n");
614 goto error;
615 }
616 ret = 0;
617
618 error:
619 return ret;
620 }
621
622 static
623 int get_data_packet(struct lttng_live_ctx *ctx,
624 struct ctf_stream_pos *pos,
625 struct lttng_live_viewer_stream *stream, uint64_t offset,
626 uint64_t len)
627 {
628 struct lttng_viewer_cmd cmd;
629 struct lttng_viewer_get_packet rq;
630 struct lttng_viewer_trace_packet rp;
631 ssize_t ret_len;
632 int ret;
633
634 retry:
635 if (lttng_live_should_quit()) {
636 ret = -1;
637 goto end;
638 }
639 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
640 cmd.data_size = sizeof(rq);
641 cmd.cmd_version = 0;
642
643 memset(&rq, 0, sizeof(rq));
644 rq.stream_id = htobe64(stream->id);
645 rq.offset = htobe64(offset);
646 rq.len = htobe32(len);
647
648 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
649 if (ret_len < 0) {
650 perror("[error] Error sending cmd");
651 goto error;
652 }
653 assert(ret_len == sizeof(cmd));
654
655 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
656 if (ret_len < 0) {
657 perror("[error] Error sending get_data_packet request");
658 goto error;
659 }
660 assert(ret_len == sizeof(rq));
661
662 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
663 if (ret_len == 0) {
664 fprintf(stderr, "[error] Remote side has closed connection\n");
665 goto error;
666 }
667 if (ret_len < 0) {
668 perror("[error] Error receiving data response");
669 goto error;
670 }
671 if (ret_len != sizeof(rp)) {
672 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
673 ", received %" PRId64 "\n", sizeof(rp),
674 ret_len);
675 goto error;
676 }
677
678 rp.flags = be32toh(rp.flags);
679
680 switch (be32toh(rp.status)) {
681 case LTTNG_VIEWER_GET_PACKET_OK:
682 len = be32toh(rp.len);
683 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
684 "\n", len);
685 break;
686 case LTTNG_VIEWER_GET_PACKET_RETRY:
687 /* Unimplemented by relay daemon */
688 printf_verbose("get_data_packet: retry\n");
689 goto error;
690 case LTTNG_VIEWER_GET_PACKET_ERR:
691 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
692 printf_verbose("get_data_packet: new metadata needed\n");
693 ret = append_metadata(ctx, stream);
694 if (ret)
695 goto error;
696 }
697 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
698 printf_verbose("get_data_packet: new streams needed\n");
699 ret = ask_new_streams(ctx);
700 if (ret < 0)
701 goto error;
702 else if (ret > 0)
703 g_hash_table_foreach(ctx->session->ctf_traces,
704 add_traces, ctx->bt_ctx);
705 }
706 if (rp.flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
707 | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
708 goto retry;
709 }
710 fprintf(stderr, "[error] get_data_packet: error\n");
711 goto error;
712 case LTTNG_VIEWER_GET_PACKET_EOF:
713 ret = -2;
714 goto end;
715 default:
716 printf_verbose("get_data_packet: unknown\n");
717 goto error;
718 }
719
720 if (len == 0) {
721 goto error;
722 }
723
724 if (len > stream->mmap_size) {
725 uint64_t new_size;
726
727 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
728 if (pos->base_mma) {
729 /* unmap old base */
730 ret = munmap_align(pos->base_mma);
731 if (ret) {
732 perror("[error] Unable to unmap old base");
733 goto error;
734 }
735 pos->base_mma = NULL;
736 }
737 pos->base_mma = mmap_align(new_size,
738 PROT_READ | PROT_WRITE,
739 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
740 if (pos->base_mma == MAP_FAILED) {
741 perror("[error] mmap error");
742 pos->base_mma = NULL;
743 goto error;
744 }
745
746 stream->mmap_size = new_size;
747 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
748 stream->mmap_size);
749 }
750
751 ret_len = lttng_live_recv(ctx->control_sock,
752 mmap_align_addr(pos->base_mma), len);
753 if (ret_len == 0) {
754 fprintf(stderr, "[error] Remote side has closed connection\n");
755 goto error;
756 }
757 if (ret_len < 0) {
758 perror("[error] Error receiving trace packet");
759 goto error;
760 }
761 assert(ret_len == len);
762 ret = 0;
763 end:
764 return ret;
765
766 error:
767 return -1;
768 }
769
770 static
771 int get_one_metadata_packet(struct lttng_live_ctx *ctx,
772 struct lttng_live_viewer_stream *metadata_stream)
773 {
774 uint64_t len = 0;
775 int ret;
776 struct lttng_viewer_cmd cmd;
777 struct lttng_viewer_get_metadata rq;
778 struct lttng_viewer_metadata_packet rp;
779 char *data = NULL;
780 ssize_t ret_len;
781
782 if (lttng_live_should_quit()) {
783 ret = -1;
784 goto end;
785 }
786
787 rq.stream_id = htobe64(metadata_stream->id);
788 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
789 cmd.data_size = sizeof(rq);
790 cmd.cmd_version = 0;
791
792 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
793 if (ret_len < 0) {
794 perror("[error] Error sending cmd");
795 goto error;
796 }
797 assert(ret_len == sizeof(cmd));
798
799 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
800 if (ret_len < 0) {
801 perror("[error] Error sending get_metadata request");
802 goto error;
803 }
804 assert(ret_len == sizeof(rq));
805
806 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
807 if (ret_len == 0) {
808 fprintf(stderr, "[error] Remote side has closed connection\n");
809 goto error;
810 }
811 if (ret_len < 0) {
812 perror("[error] Error receiving metadata response");
813 goto error;
814 }
815 assert(ret_len == sizeof(rp));
816
817 switch (be32toh(rp.status)) {
818 case LTTNG_VIEWER_METADATA_OK:
819 printf_verbose("get_metadata : OK\n");
820 break;
821 case LTTNG_VIEWER_NO_NEW_METADATA:
822 printf_verbose("get_metadata : NO NEW\n");
823 ret = 0;
824 goto end;
825 case LTTNG_VIEWER_METADATA_ERR:
826 printf_verbose("get_metadata : ERR\n");
827 goto error;
828 default:
829 printf_verbose("get_metadata : UNKNOWN\n");
830 goto error;
831 }
832
833 len = be64toh(rp.len);
834 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
835 if (len <= 0) {
836 goto error;
837 }
838
839 data = zmalloc(len);
840 if (!data) {
841 perror("relay data zmalloc");
842 goto error;
843 }
844 ret_len = lttng_live_recv(ctx->control_sock, data, len);
845 if (ret_len == 0) {
846 fprintf(stderr, "[error] Remote side has closed connection\n");
847 goto error_free_data;
848 }
849 if (ret_len < 0) {
850 perror("[error] Error receiving trace packet");
851 goto error_free_data;
852 }
853 assert(ret_len == len);
854
855 do {
856 ret_len = fwrite(data, 1, len,
857 metadata_stream->metadata_fp_write);
858 } while (ret_len < 0 && errno == EINTR);
859 if (ret_len < 0) {
860 fprintf(stderr, "[error] Writing in the metadata fp\n");
861 goto error_free_data;
862 }
863 assert(ret_len == len);
864 metadata_stream->metadata_len += len;
865 free(data);
866 ret = len;
867 end:
868 return ret;
869
870 error_free_data:
871 free(data);
872 error:
873 return -1;
874 }
875
876 /*
877 * Return 0 on success, a negative value on error.
878 */
879 static
880 int get_new_metadata(struct lttng_live_ctx *ctx,
881 struct lttng_live_viewer_stream *viewer_stream,
882 char **metadata_buf)
883 {
884 int ret = 0;
885 struct lttng_live_viewer_stream *metadata_stream;
886 size_t size, len_read = 0;
887
888 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
889 if (!metadata_stream) {
890 fprintf(stderr, "[error] No metadata stream\n");
891 ret = -1;
892 goto error;
893 }
894 metadata_stream->metadata_len = 0;
895 ret = open_metadata_fp_write(metadata_stream, metadata_buf, &size);
896 if (ret < 0) {
897 goto error;
898 }
899
900 do {
901 /*
902 * get_one_metadata_packet returns the number of bytes
903 * received, 0 when we have received everything, a
904 * negative value on error.
905 */
906 ret = get_one_metadata_packet(ctx, metadata_stream);
907 if (ret > 0) {
908 len_read += ret;
909 }
910 if (!len_read) {
911 (void) poll(NULL, 0, ACTIVE_POLL_DELAY);
912 }
913 } while (ret > 0 || !len_read);
914
915 if (fclose(metadata_stream->metadata_fp_write))
916 perror("fclose");
917 metadata_stream->metadata_fp_write = NULL;
918
919 error:
920 return ret;
921 }
922
923 /*
924 * Get one index for a stream.
925 *
926 * Returns 0 on success or a negative value on error.
927 */
928 static
929 int get_next_index(struct lttng_live_ctx *ctx,
930 struct lttng_live_viewer_stream *viewer_stream,
931 struct packet_index *index)
932 {
933 struct lttng_viewer_cmd cmd;
934 struct lttng_viewer_get_next_index rq;
935 struct lttng_viewer_index rp;
936 int ret;
937 ssize_t ret_len;
938
939 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
940 cmd.data_size = sizeof(rq);
941 cmd.cmd_version = 0;
942
943 memset(&rq, 0, sizeof(rq));
944 rq.stream_id = htobe64(viewer_stream->id);
945
946 retry:
947 if (lttng_live_should_quit()) {
948 ret = -1;
949 goto end;
950 }
951 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
952 if (ret_len < 0) {
953 perror("[error] Error sending cmd");
954 goto error;
955 }
956 assert(ret_len == sizeof(cmd));
957
958 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
959 if (ret_len < 0) {
960 perror("[error] Error sending get_next_index request");
961 goto error;
962 }
963 assert(ret_len == sizeof(rq));
964
965 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
966 if (ret_len == 0) {
967 fprintf(stderr, "[error] Remote side has closed connection\n");
968 goto error;
969 }
970 if (ret_len < 0) {
971 perror("[error] Error receiving index response");
972 goto error;
973 }
974 assert(ret_len == sizeof(rp));
975
976 rp.flags = be32toh(rp.flags);
977
978 switch (be32toh(rp.status)) {
979 case LTTNG_VIEWER_INDEX_INACTIVE:
980 printf_verbose("get_next_index: inactive\n");
981 memset(index, 0, sizeof(struct packet_index));
982 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
983 break;
984 case LTTNG_VIEWER_INDEX_OK:
985 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
986 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
987 index->offset = be64toh(rp.offset);
988 index->packet_size = be64toh(rp.packet_size);
989 index->content_size = be64toh(rp.content_size);
990 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
991 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
992 index->events_discarded = be64toh(rp.events_discarded);
993
994 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
995 ret = append_metadata(ctx, viewer_stream);
996 if (ret)
997 goto error;
998 }
999 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1000 printf_verbose("get_next_index: need new streams\n");
1001 ret = ask_new_streams(ctx);
1002 if (ret < 0)
1003 goto error;
1004 else if (ret > 0)
1005 g_hash_table_foreach(ctx->session->ctf_traces,
1006 add_traces, ctx->bt_ctx);
1007 }
1008 break;
1009 case LTTNG_VIEWER_INDEX_RETRY:
1010 printf_verbose("get_next_index: retry\n");
1011 (void) poll(NULL, 0, ACTIVE_POLL_DELAY);
1012 goto retry;
1013 case LTTNG_VIEWER_INDEX_HUP:
1014 printf_verbose("get_next_index: stream hung up\n");
1015 viewer_stream->id = -1ULL;
1016 index->offset = EOF;
1017 ctx->session->stream_count--;
1018 break;
1019 case LTTNG_VIEWER_INDEX_ERR:
1020 fprintf(stderr, "[error] get_next_index: error\n");
1021 goto error;
1022 default:
1023 fprintf(stderr, "[error] get_next_index: unkwown value\n");
1024 goto error;
1025 }
1026 ret = 0;
1027 end:
1028 return ret;
1029
1030 error:
1031 return -1;
1032 }
1033
1034 static
1035 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
1036 int whence)
1037 {
1038 struct ctf_stream_pos *pos;
1039 struct ctf_file_stream *file_stream;
1040 struct packet_index *prev_index = NULL, *cur_index;
1041 struct lttng_live_viewer_stream *viewer_stream;
1042 struct lttng_live_session *session;
1043 int ret;
1044
1045 retry:
1046 pos = ctf_pos(stream_pos);
1047 file_stream = container_of(pos, struct ctf_file_stream, pos);
1048 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
1049 session = viewer_stream->session;
1050
1051 switch (pos->packet_index->len) {
1052 case 0:
1053 g_array_set_size(pos->packet_index, 1);
1054 cur_index = &g_array_index(pos->packet_index,
1055 struct packet_index, 0);
1056 break;
1057 case 1:
1058 g_array_set_size(pos->packet_index, 2);
1059 prev_index = &g_array_index(pos->packet_index,
1060 struct packet_index, 0);
1061 cur_index = &g_array_index(pos->packet_index,
1062 struct packet_index, 1);
1063 break;
1064 case 2:
1065 g_array_index(pos->packet_index,
1066 struct packet_index, 0) =
1067 g_array_index(pos->packet_index,
1068 struct packet_index, 1);
1069 prev_index = &g_array_index(pos->packet_index,
1070 struct packet_index, 0);
1071 cur_index = &g_array_index(pos->packet_index,
1072 struct packet_index, 1);
1073 break;
1074 default:
1075 abort();
1076 break;
1077 }
1078 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
1079 ret = get_next_index(session->ctx, viewer_stream, cur_index);
1080 if (ret < 0) {
1081 pos->offset = EOF;
1082 if (!lttng_live_should_quit()) {
1083 fprintf(stderr, "[error] get_next_index failed\n");
1084 }
1085 return;
1086 }
1087
1088 pos->packet_size = cur_index->packet_size;
1089 pos->content_size = cur_index->content_size;
1090 pos->mmap_base_offset = 0;
1091 if (cur_index->offset == EOF) {
1092 pos->offset = EOF;
1093 } else {
1094 pos->offset = 0;
1095 }
1096
1097 if (cur_index->content_size == 0) {
1098 file_stream->parent.cycles_timestamp =
1099 cur_index->ts_cycles.timestamp_end;
1100 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
1101 &file_stream->parent,
1102 cur_index->ts_cycles.timestamp_end);
1103 } else {
1104 /* Convert the timestamps and append to the real_index. */
1105 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1106 &file_stream->parent,
1107 cur_index->ts_cycles.timestamp_begin);
1108 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1109 &file_stream->parent,
1110 cur_index->ts_cycles.timestamp_end);
1111
1112 ctf_update_current_packet_index(&file_stream->parent,
1113 prev_index, cur_index);
1114
1115 file_stream->parent.cycles_timestamp =
1116 cur_index->ts_cycles.timestamp_begin;
1117 file_stream->parent.real_timestamp =
1118 cur_index->ts_real.timestamp_begin;
1119 }
1120
1121 if (pos->packet_size == 0 || pos->offset == EOF) {
1122 goto end;
1123 }
1124
1125 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1126 viewer_stream->id);
1127 ret = get_data_packet(session->ctx, pos, viewer_stream,
1128 cur_index->offset,
1129 cur_index->packet_size / CHAR_BIT);
1130 if (ret == -2) {
1131 goto retry;
1132 } else if (ret < 0) {
1133 pos->offset = EOF;
1134 if (!lttng_live_should_quit()) {
1135 fprintf(stderr, "[error] get_data_packet failed\n");
1136 }
1137 return;
1138 }
1139
1140 printf_verbose("Index received : packet_size : %" PRIu64
1141 ", offset %" PRIu64 ", content_size %" PRIu64
1142 ", timestamp_end : %" PRIu64 "\n",
1143 cur_index->packet_size, cur_index->offset,
1144 cur_index->content_size,
1145 cur_index->ts_cycles.timestamp_end);
1146
1147 /* update trace_packet_header and stream_packet_context */
1148 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1149 /* Read packet header */
1150 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1151 if (ret) {
1152 pos->offset = EOF;
1153 fprintf(stderr, "[error] trace packet header read failed\n");
1154 goto end;
1155 }
1156 }
1157 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1158 /* Read packet context */
1159 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1160 if (ret) {
1161 pos->offset = EOF;
1162 fprintf(stderr, "[error] stream packet context read failed\n");
1163 goto end;
1164 }
1165 }
1166 pos->data_offset = pos->offset;
1167
1168 end:
1169 return;
1170 }
1171
1172 int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1173 {
1174 struct lttng_viewer_cmd cmd;
1175 struct lttng_viewer_create_session_response resp;
1176 int ret;
1177 ssize_t ret_len;
1178
1179 if (lttng_live_should_quit()) {
1180 ret = -1;
1181 goto end;
1182 }
1183
1184 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1185 cmd.data_size = 0;
1186 cmd.cmd_version = 0;
1187
1188 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
1189 if (ret_len < 0) {
1190 perror("[error] Error sending cmd");
1191 goto error;
1192 }
1193 assert(ret_len == sizeof(cmd));
1194
1195 ret_len = lttng_live_recv(ctx->control_sock, &resp, sizeof(resp));
1196 if (ret_len == 0) {
1197 fprintf(stderr, "[error] Remote side has closed connection\n");
1198 goto error;
1199 }
1200 if (ret_len < 0) {
1201 perror("[error] Error receiving create session reply");
1202 goto error;
1203 }
1204 assert(ret_len == sizeof(resp));
1205
1206 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1207 fprintf(stderr, "[error] Error creating viewer session\n");
1208 goto error;
1209 }
1210 ret = 0;
1211 end:
1212 return ret;
1213
1214 error:
1215 return -1;
1216 }
1217
1218 static
1219 int del_traces(gpointer key, gpointer value, gpointer user_data)
1220 {
1221 struct bt_context *bt_ctx = user_data;
1222 struct lttng_live_ctf_trace *trace = value;
1223 int ret;
1224
1225 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1226 if (ret < 0)
1227 fprintf(stderr, "[error] removing trace from context\n");
1228
1229 /* remove the key/value pair from the HT. */
1230 return 1;
1231 }
1232
1233 static
1234 void add_traces(gpointer key, gpointer value, gpointer user_data)
1235 {
1236 int i, ret;
1237 struct bt_context *bt_ctx = user_data;
1238 struct lttng_live_ctf_trace *trace = value;
1239 struct lttng_live_viewer_stream *stream;
1240 struct bt_mmap_stream *new_mmap_stream;
1241 struct bt_mmap_stream_list mmap_list;
1242 struct lttng_live_ctx *ctx = NULL;
1243 struct bt_trace_descriptor *td;
1244 struct bt_trace_handle *handle;
1245
1246 /*
1247 * We don't know how many streams we will receive for a trace, so
1248 * once we are done receiving the traces, we add all the traces
1249 * received to the bt_context.
1250 * We can receive streams during the attach command or the
1251 * get_new_streams, so we have to make sure not to add multiple
1252 * times the same traces.
1253 * If a trace is already in the context, we just skip this function.
1254 */
1255 if (trace->in_use)
1256 return;
1257
1258 BT_INIT_LIST_HEAD(&mmap_list.head);
1259
1260 for (i = 0; i < trace->streams->len; i++) {
1261 stream = g_ptr_array_index(trace->streams, i);
1262 ctx = stream->session->ctx;
1263
1264 if (!stream->metadata_flag) {
1265 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1266 new_mmap_stream->priv = (void *) stream;
1267 new_mmap_stream->fd = -1;
1268 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1269 } else {
1270 char *metadata_buf = NULL;
1271
1272 /* Get all possible metadata before starting */
1273 ret = get_new_metadata(ctx, stream, &metadata_buf);
1274 if (ret) {
1275 free(metadata_buf);
1276 goto end_free;
1277 }
1278 if (!stream->metadata_len) {
1279 fprintf(stderr, "[error] empty metadata\n");
1280 ret = -1;
1281 free(metadata_buf);
1282 goto end_free;
1283 }
1284
1285 trace->metadata_fp = babeltrace_fmemopen(metadata_buf,
1286 stream->metadata_len, "rb");
1287 if (!trace->metadata_fp) {
1288 perror("Metadata fmemopen");
1289 ret = -1;
1290 free(metadata_buf);
1291 goto end_free;
1292 }
1293 }
1294 }
1295
1296 if (!trace->metadata_fp) {
1297 fprintf(stderr, "[error] No metadata stream opened\n");
1298 goto end_free;
1299 }
1300
1301 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1302 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1303 if (ret < 0) {
1304 fprintf(stderr, "[error] Error adding trace\n");
1305 goto end_free;
1306 }
1307 trace->metadata_stream->metadata_len = 0;
1308
1309 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1310 bt_ctx->trace_handles,
1311 (gpointer) (unsigned long) ret);
1312 td = handle->td;
1313 trace->handle = handle;
1314 if (bt_ctx->current_iterator) {
1315 bt_iter_add_trace(bt_ctx->current_iterator, td);
1316 }
1317
1318 trace->trace_id = ret;
1319 trace->in_use = 1;
1320
1321 goto end;
1322
1323 end_free:
1324 bt_context_put(bt_ctx);
1325 end:
1326 return;
1327 }
1328
1329 /*
1330 * Request new streams for a session.
1331 * Returns the number of streams received or a negative value on error.
1332 */
1333 int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
1334 {
1335 struct lttng_viewer_cmd cmd;
1336 struct lttng_viewer_new_streams_request rq;
1337 struct lttng_viewer_new_streams_response rp;
1338 struct lttng_viewer_stream stream;
1339 int ret, i, nb_streams = 0;
1340 ssize_t ret_len;
1341 uint32_t stream_count;
1342
1343 if (lttng_live_should_quit()) {
1344 ret = -1;
1345 goto end;
1346 }
1347
1348 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1349 cmd.data_size = sizeof(rq);
1350 cmd.cmd_version = 0;
1351
1352 memset(&rq, 0, sizeof(rq));
1353 rq.session_id = htobe64(id);
1354
1355 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
1356 if (ret_len < 0) {
1357 perror("[error] Error sending cmd");
1358 goto error;
1359 }
1360 assert(ret_len == sizeof(cmd));
1361
1362 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
1363 if (ret_len < 0) {
1364 perror("[error] Error sending get_new_streams request");
1365 goto error;
1366 }
1367 assert(ret_len == sizeof(rq));
1368
1369 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
1370 if (ret_len == 0) {
1371 fprintf(stderr, "[error] Remote side has closed connection\n");
1372 goto error;
1373 }
1374 if (ret_len < 0) {
1375 perror("[error] Error receiving get_new_streams response");
1376 goto error;
1377 }
1378 assert(ret_len == sizeof(rp));
1379
1380 switch(be32toh(rp.status)) {
1381 case LTTNG_VIEWER_NEW_STREAMS_OK:
1382 break;
1383 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1384 ret = 0;
1385 goto end;
1386 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1387 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1388 goto end;
1389 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1390 fprintf(stderr, "[error] get_new_streams error\n");
1391 goto error;
1392 default:
1393 fprintf(stderr, "[error] Unknown return code %u\n",
1394 be32toh(rp.status));
1395 goto error;
1396 }
1397
1398 stream_count = be32toh(rp.streams_count);
1399 ctx->session->stream_count += stream_count;
1400 /*
1401 * When the session is created but not started, we do an active wait
1402 * until it starts. It allows the viewer to start processing the trace
1403 * as soon as the session starts.
1404 */
1405 if (ctx->session->stream_count == 0) {
1406 ret = 0;
1407 goto end;
1408 }
1409 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1410 ctx->session->stream_count);
1411 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1412 ctx->session->stream_count);
1413 for (i = 0; i < stream_count; i++) {
1414 ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
1415 if (ret_len == 0) {
1416 fprintf(stderr, "[error] Remote side has closed connection\n");
1417 goto error;
1418 }
1419 if (ret_len < 0) {
1420 perror("[error] Error receiving stream");
1421 goto error;
1422 }
1423 assert(ret_len == sizeof(stream));
1424 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1425 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1426
1427 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1428 be64toh(stream.id), stream.path_name,
1429 stream.channel_name);
1430 ctx->session->streams[i].id = be64toh(stream.id);
1431 ctx->session->streams[i].session = ctx->session;
1432
1433 ctx->session->streams[i].first_read = 1;
1434 ctx->session->streams[i].mmap_size = 0;
1435
1436 if (be32toh(stream.metadata_flag)) {
1437 ctx->session->streams[i].metadata_flag = 1;
1438 }
1439 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1440 be64toh(stream.ctf_trace_id));
1441 if (ret < 0) {
1442 goto error;
1443 }
1444 nb_streams++;
1445
1446 }
1447 ret = nb_streams;
1448 end:
1449 return ret;
1450
1451 error:
1452 return -1;
1453 }
1454
1455 int lttng_live_read(struct lttng_live_ctx *ctx)
1456 {
1457 int ret = -1;
1458 int i;
1459 struct bt_ctf_iter *iter;
1460 const struct bt_ctf_event *event;
1461 struct bt_iter_pos begin_pos;
1462 struct bt_trace_descriptor *td_write;
1463 struct bt_format *fmt_write;
1464 struct ctf_text_stream_pos *sout;
1465 uint64_t id;
1466
1467 ctx->bt_ctx = bt_context_create();
1468 if (!ctx->bt_ctx) {
1469 fprintf(stderr, "[error] bt_context_create allocation\n");
1470 goto end;
1471 }
1472
1473 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1474 if (!fmt_write) {
1475 fprintf(stderr, "[error] ctf-text error\n");
1476 goto end;
1477 }
1478
1479 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1480 if (!td_write) {
1481 fprintf(stderr, "[error] Error opening output trace\n");
1482 goto end_free;
1483 }
1484
1485 sout = container_of(td_write, struct ctf_text_stream_pos,
1486 trace_descriptor);
1487 if (!sout->parent.event_cb) {
1488 goto end_free;
1489 }
1490
1491 ret = lttng_live_create_viewer_session(ctx);
1492 if (ret < 0) {
1493 goto end_free;
1494 }
1495
1496 for (i = 0; i < ctx->session_ids->len; i++) {
1497 id = g_array_index(ctx->session_ids, uint64_t, i);
1498 printf_verbose("Attaching to session %lu\n", id);
1499 ret = lttng_live_attach_session(ctx, id);
1500 printf_verbose("Attaching session returns %d\n", ret);
1501 if (ret < 0) {
1502 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1503 fprintf(stderr, "[error] Unknown session ID\n");
1504 }
1505 goto end_free;
1506 }
1507 }
1508
1509 /*
1510 * As long as the session is active, we try to get new streams.
1511 */
1512 for (;;) {
1513 int flags;
1514
1515 if (lttng_live_should_quit()) {
1516 ret = 0;
1517 goto end_free;
1518 }
1519
1520 while (!ctx->session->stream_count) {
1521 if (lttng_live_should_quit()
1522 || ctx->session_ids->len == 0) {
1523 ret = 0;
1524 goto end_free;
1525 }
1526 ret = ask_new_streams(ctx);
1527 if (ret < 0) {
1528 goto end_free;
1529 }
1530 if (!ctx->session->stream_count) {
1531 (void) poll(NULL, 0, ACTIVE_POLL_DELAY);
1532 }
1533 }
1534
1535 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1536 ctx->bt_ctx);
1537
1538 begin_pos.type = BT_SEEK_BEGIN;
1539 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
1540 if (!iter) {
1541 if (lttng_live_should_quit()) {
1542 ret = 0;
1543 goto end;
1544 }
1545 fprintf(stderr, "[error] Iterator creation error\n");
1546 goto end;
1547 }
1548 for (;;) {
1549 if (lttng_live_should_quit()) {
1550 ret = 0;
1551 goto end_free;
1552 }
1553 event = bt_ctf_iter_read_event_flags(iter, &flags);
1554 if (!(flags & BT_ITER_FLAG_RETRY)) {
1555 if (!event) {
1556 /* End of trace */
1557 break;
1558 }
1559 ret = sout->parent.event_cb(&sout->parent,
1560 event->parent->stream);
1561 if (ret) {
1562 fprintf(stderr, "[error] Writing "
1563 "event failed.\n");
1564 goto end_free;
1565 }
1566 }
1567 ret = bt_iter_next(bt_ctf_get_iter(iter));
1568 if (ret < 0) {
1569 goto end_free;
1570 }
1571 }
1572 bt_ctf_iter_destroy(iter);
1573 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1574 del_traces, ctx->bt_ctx);
1575 ctx->session->stream_count = 0;
1576 }
1577
1578 end_free:
1579 bt_context_put(ctx->bt_ctx);
1580 end:
1581 if (lttng_live_should_quit()) {
1582 ret = 0;
1583 }
1584 return ret;
1585 }
This page took 0.08435 seconds and 4 git commands to generate.