Fix: lttng-live recv() and send() flags, partial recv()
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include <babeltrace/ctf/ctf-index.h>
38
39 #include <babeltrace/babeltrace.h>
40 #include <babeltrace/ctf/events.h>
41 #include <babeltrace/ctf/callbacks.h>
42 #include <babeltrace/ctf/iterator.h>
43
44 /* for packet_index */
45 #include <babeltrace/ctf/types.h>
46
47 #include <babeltrace/ctf/metadata.h>
48 #include <babeltrace/ctf-text/types.h>
49 #include <babeltrace/ctf/events-internal.h>
50 #include <formats/ctf/events-private.h>
51
52 #include <babeltrace/compat/memstream.h>
53
54 #include "lttng-live.h"
55 #include "lttng-viewer-abi.h"
56
57 /*
58 * Memory allocation zeroed
59 */
60 #define zmalloc(x) calloc(1, x)
61
62 #ifndef max_t
63 #define max_t(type, a, b) \
64 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
65 #endif
66
67 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
68 size_t index, int whence);
69 static void add_traces(gpointer key, gpointer value, gpointer user_data);
70 static int del_traces(gpointer key, gpointer value, gpointer user_data);
71 static int get_new_metadata(struct lttng_live_ctx *ctx,
72 struct lttng_live_viewer_stream *viewer_stream,
73 char **metadata_buf);
74
75 static
76 ssize_t lttng_live_recv(int fd, void *buf, size_t len)
77 {
78 ssize_t ret;
79 size_t copied = 0, to_copy = len;
80
81 do {
82 ret = recv(fd, buf + copied, to_copy, 0);
83 if (ret > 0) {
84 assert(ret <= to_copy);
85 copied += ret;
86 to_copy -= ret;
87 }
88 } while ((ret > 0 && to_copy > 0)
89 || (ret < 0 && errno == EINTR));
90 if (ret > 0)
91 ret = copied;
92 /* ret = 0 means orderly shutdown, ret < 0 is error. */
93 return ret;
94 }
95
96 static
97 ssize_t lttng_live_send(int fd, const void *buf, size_t len)
98 {
99 ssize_t ret;
100
101 do {
102 ret = send(fd, buf, len, MSG_NOSIGNAL);
103 } while (ret < 0 && errno == EINTR);
104 return ret;
105 }
106
107 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
108 {
109 struct hostent *host;
110 struct sockaddr_in server_addr;
111 int ret;
112
113 host = gethostbyname(ctx->relay_hostname);
114 if (!host) {
115 ret = -1;
116 goto end;
117 }
118
119 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
120 perror("Socket");
121 ret = -1;
122 goto end;
123 }
124
125 server_addr.sin_family = AF_INET;
126 server_addr.sin_port = htons(ctx->port);
127 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
128 bzero(&(server_addr.sin_zero), 8);
129
130 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
131 sizeof(struct sockaddr)) == -1) {
132 perror("Connect");
133 ret = -1;
134 goto end;
135 }
136
137 ret = 0;
138
139 end:
140 return ret;
141 }
142
143 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
144 {
145 struct lttng_viewer_cmd cmd;
146 struct lttng_viewer_connect connect;
147 int ret;
148 ssize_t ret_len;
149
150 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
151 cmd.data_size = sizeof(connect);
152 cmd.cmd_version = 0;
153
154 connect.viewer_session_id = -1ULL; /* will be set on recv */
155 connect.major = htobe32(LTTNG_LIVE_MAJOR);
156 connect.minor = htobe32(LTTNG_LIVE_MINOR);
157 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
158
159 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
160 if (ret_len < 0) {
161 perror("[error] Error sending cmd");
162 ret = ret_len;
163 goto error;
164 }
165 assert(ret_len == sizeof(cmd));
166
167 ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect));
168 if (ret_len < 0) {
169 perror("[error] Error sending version");
170 ret = ret_len;
171 goto error;
172 }
173 assert(ret_len == sizeof(connect));
174
175 ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect));
176 if (ret_len == 0) {
177 fprintf(stderr, "[error] Remote side has closed connection\n");
178 ret = -1;
179 goto error;
180 }
181 if (ret_len < 0) {
182 perror("[error] Error receiving version");
183 ret = ret_len;
184 goto error;
185 }
186 assert(ret_len == sizeof(connect));
187
188 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
189 be64toh(connect.viewer_session_id));
190 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
191 be32toh(connect.minor));
192
193 ret = 0;
194
195 error:
196 return ret;
197 }
198
199 static
200 void free_session_list(GPtrArray *session_list)
201 {
202 int i;
203 struct lttng_live_relay_session *relay_session;
204
205 for (i = 0; i < session_list->len; i++) {
206 relay_session = g_ptr_array_index(session_list, i);
207 free(relay_session->name);
208 free(relay_session->hostname);
209 }
210 g_ptr_array_free(session_list, TRUE);
211 }
212
213 static
214 void print_session_list(GPtrArray *session_list, const char *path)
215 {
216 int i;
217 struct lttng_live_relay_session *relay_session;
218
219 for (i = 0; i < session_list->len; i++) {
220 relay_session = g_ptr_array_index(session_list, i);
221 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
222 "%u stream(s), %u client(s) connected)\n",
223 path, relay_session->hostname,
224 relay_session->name, relay_session->timer,
225 relay_session->streams, relay_session->clients);
226 }
227 }
228
229 static
230 void update_session_list(GPtrArray *session_list, char *hostname,
231 char *session_name, uint32_t streams, uint32_t clients,
232 uint32_t timer)
233 {
234 int i, found = 0;
235 struct lttng_live_relay_session *relay_session;
236
237 for (i = 0; i < session_list->len; i++) {
238 relay_session = g_ptr_array_index(session_list, i);
239 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
240 strncmp(relay_session->name,
241 session_name, NAME_MAX) == 0) {
242 relay_session->streams += streams;
243 if (relay_session->clients < clients)
244 relay_session->clients = clients;
245 found = 1;
246 break;
247 }
248 }
249 if (found)
250 return;
251
252 relay_session = g_new0(struct lttng_live_relay_session, 1);
253 relay_session->hostname = strndup(hostname, NAME_MAX);
254 relay_session->name = strndup(session_name, NAME_MAX);
255 relay_session->clients = clients;
256 relay_session->streams = streams;
257 relay_session->timer = timer;
258 g_ptr_array_add(session_list, relay_session);
259 }
260
261 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
262 {
263 struct lttng_viewer_cmd cmd;
264 struct lttng_viewer_list_sessions list;
265 struct lttng_viewer_session lsession;
266 int i, ret, sessions_count, print_list = 0;
267 ssize_t ret_len;
268 uint64_t session_id;
269 GPtrArray *session_list = NULL;
270
271 if (strlen(ctx->session_name) == 0) {
272 print_list = 1;
273 session_list = g_ptr_array_new();
274 }
275
276 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
277 cmd.data_size = 0;
278 cmd.cmd_version = 0;
279
280 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
281 if (ret_len < 0) {
282 perror("[error] Error sending cmd");
283 ret = ret_len;
284 goto error;
285 }
286 assert(ret_len == sizeof(cmd));
287
288 ret_len = lttng_live_recv(ctx->control_sock, &list, sizeof(list));
289 if (ret_len == 0) {
290 fprintf(stderr, "[error] Remote side has closed connection\n");
291 ret = -1;
292 goto error;
293 }
294 if (ret_len < 0) {
295 perror("[error] Error receiving session list");
296 ret = ret_len;
297 goto error;
298 }
299 assert(ret_len == sizeof(list));
300
301 sessions_count = be32toh(list.sessions_count);
302 for (i = 0; i < sessions_count; i++) {
303 ret_len = lttng_live_recv(ctx->control_sock, &lsession, sizeof(lsession));
304 if (ret_len == 0) {
305 fprintf(stderr, "[error] Remote side has closed connection\n");
306 ret = -1;
307 goto error;
308 }
309 if (ret_len < 0) {
310 perror("[error] Error receiving session");
311 ret = ret_len;
312 goto error;
313 }
314 assert(ret_len == sizeof(lsession));
315 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
316 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
317 session_id = be64toh(lsession.id);
318
319 if (print_list) {
320 update_session_list(session_list,
321 lsession.hostname,
322 lsession.session_name,
323 be32toh(lsession.streams),
324 be32toh(lsession.clients),
325 be32toh(lsession.live_timer));
326 } else {
327 if ((strncmp(lsession.session_name, ctx->session_name,
328 NAME_MAX) == 0) && (strncmp(lsession.hostname,
329 ctx->traced_hostname, NAME_MAX) == 0)) {
330 printf_verbose("Reading from session %" PRIu64 "\n",
331 session_id);
332 g_array_append_val(ctx->session_ids,
333 session_id);
334 }
335 }
336 }
337
338 if (print_list) {
339 print_session_list(session_list, path);
340 free_session_list(session_list);
341 }
342
343 ret = 0;
344
345 error:
346 return ret;
347 }
348
349 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
350 uint64_t ctf_trace_id)
351 {
352 struct lttng_live_ctf_trace *trace;
353 int ret = 0;
354
355 trace = g_hash_table_lookup(stream->session->ctf_traces,
356 (gpointer) ctf_trace_id);
357 if (!trace) {
358 trace = g_new0(struct lttng_live_ctf_trace, 1);
359 trace->ctf_trace_id = ctf_trace_id;
360 trace->streams = g_ptr_array_new();
361 g_hash_table_insert(stream->session->ctf_traces,
362 (gpointer) ctf_trace_id,
363 trace);
364 }
365 if (stream->metadata_flag)
366 trace->metadata_stream = stream;
367
368 stream->ctf_trace = trace;
369 g_ptr_array_add(trace->streams, stream);
370
371 return ret;
372 }
373
374 static
375 int open_metadata_fp_write(struct lttng_live_viewer_stream *stream,
376 char **metadata_buf, size_t *size)
377 {
378 int ret = 0;
379
380 stream->metadata_fp_write =
381 babeltrace_open_memstream(metadata_buf, size);
382 if (!stream->metadata_fp_write) {
383 perror("Metadata open_memstream");
384 ret = -1;
385 }
386
387 return ret;
388 }
389
390 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
391 {
392 struct lttng_viewer_cmd cmd;
393 struct lttng_viewer_attach_session_request rq;
394 struct lttng_viewer_attach_session_response rp;
395 struct lttng_viewer_stream stream;
396 int ret, i;
397 ssize_t ret_len;
398
399 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
400 cmd.data_size = sizeof(rq);
401 cmd.cmd_version = 0;
402
403 memset(&rq, 0, sizeof(rq));
404 rq.session_id = htobe64(id);
405 // TODO: add cmd line parameter to select seek beginning
406 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
407 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
408
409 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
410 if (ret_len < 0) {
411 perror("[error] Error sending cmd");
412 ret = ret_len;
413 goto error;
414 }
415 assert(ret_len == sizeof(cmd));
416
417 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
418 if (ret_len < 0) {
419 perror("[error] Error sending attach request");
420 ret = ret_len;
421 goto error;
422 }
423 assert(ret_len == sizeof(rq));
424
425 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
426 if (ret_len == 0) {
427 fprintf(stderr, "[error] Remote side has closed connection\n");
428 ret = -1;
429 goto error;
430 }
431 if (ret_len < 0) {
432 perror("[error] Error receiving attach response");
433 ret = ret_len;
434 goto error;
435 }
436 assert(ret_len == sizeof(rp));
437
438 switch(be32toh(rp.status)) {
439 case LTTNG_VIEWER_ATTACH_OK:
440 break;
441 case LTTNG_VIEWER_ATTACH_UNK:
442 ret = -LTTNG_VIEWER_ATTACH_UNK;
443 goto end;
444 case LTTNG_VIEWER_ATTACH_ALREADY:
445 fprintf(stderr, "[error] There is already a viewer attached to this session\n");
446 ret = -1;
447 goto end;
448 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
449 fprintf(stderr, "[error] Not a live session\n");
450 ret = -1;
451 goto end;
452 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
453 fprintf(stderr, "[error] Wrong seek parameter\n");
454 ret = -1;
455 goto end;
456 default:
457 fprintf(stderr, "[error] Unknown attach return code %u\n",
458 be32toh(rp.status));
459 ret = -1;
460 goto end;
461 }
462 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
463 ret = -1;
464 goto end;
465 }
466
467 ctx->session->stream_count += be32toh(rp.streams_count);
468 /*
469 * When the session is created but not started, we do an active wait
470 * until it starts. It allows the viewer to start processing the trace
471 * as soon as the session starts.
472 */
473 if (ctx->session->stream_count == 0) {
474 ret = 0;
475 goto end;
476 }
477 printf_verbose("Waiting for %" PRIu64 " streams:\n",
478 ctx->session->stream_count);
479 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
480 ctx->session->stream_count);
481 for (i = 0; i < be32toh(rp.streams_count); i++) {
482 ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
483 if (ret_len == 0) {
484 fprintf(stderr, "[error] Remote side has closed connection\n");
485 ret = -1;
486 goto error;
487 }
488 if (ret_len < 0) {
489 perror("[error] Error receiving stream");
490 ret = ret_len;
491 goto error;
492 }
493 assert(ret_len == sizeof(stream));
494 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
495 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
496
497 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
498 be64toh(stream.id), stream.path_name,
499 stream.channel_name);
500 ctx->session->streams[i].id = be64toh(stream.id);
501 ctx->session->streams[i].session = ctx->session;
502
503 ctx->session->streams[i].first_read = 1;
504 ctx->session->streams[i].mmap_size = 0;
505
506 if (be32toh(stream.metadata_flag)) {
507 ctx->session->streams[i].metadata_flag = 1;
508 }
509 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
510 be64toh(stream.ctf_trace_id));
511 if (ret < 0) {
512 goto error;
513 }
514
515 }
516 ret = 0;
517
518 end:
519 error:
520 return ret;
521 }
522
523 static
524 int ask_new_streams(struct lttng_live_ctx *ctx)
525 {
526 int i, ret = 0;
527 uint64_t id;
528
529 restart:
530 for (i = 0; i < ctx->session_ids->len; i++) {
531 id = g_array_index(ctx->session_ids, uint64_t, i);
532 ret = lttng_live_get_new_streams(ctx, id);
533 printf_verbose("Asking for new streams returns %d\n",
534 ret);
535 if (ret < 0) {
536 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
537 printf_verbose("Session %" PRIu64 " closed\n",
538 id);
539 /*
540 * The streams have already been closed during
541 * the reading, so we only need to get rid of
542 * the trace in our internal table of sessions.
543 */
544 g_array_remove_index(ctx->session_ids, i);
545 /*
546 * We can't continue iterating on the g_array
547 * after a remove, we have to start again.
548 */
549 goto restart;
550 } else {
551 ret = -1;
552 goto end;
553 }
554 }
555 }
556
557 end:
558 return ret;
559 }
560
561 static
562 int append_metadata(struct lttng_live_ctx *ctx,
563 struct lttng_live_viewer_stream *viewer_stream)
564 {
565 int ret;
566 struct lttng_live_viewer_stream *metadata;
567 char *metadata_buf = NULL;
568
569 printf_verbose("get_next_index: new metadata needed\n");
570 ret = get_new_metadata(ctx, viewer_stream, &metadata_buf);
571 if (ret < 0) {
572 free(metadata_buf);
573 goto error;
574 }
575
576 metadata = viewer_stream->ctf_trace->metadata_stream;
577 metadata->ctf_trace->metadata_fp =
578 babeltrace_fmemopen(metadata_buf,
579 metadata->metadata_len, "rb");
580 if (!metadata->ctf_trace->metadata_fp) {
581 perror("Metadata fmemopen");
582 free(metadata_buf);
583 ret = -1;
584 goto error;
585 }
586 ret = ctf_append_trace_metadata(
587 viewer_stream->ctf_trace->handle->td,
588 metadata->ctf_trace->metadata_fp);
589 if (ret != 0) {
590 fprintf(stderr, "[error] Appending metadata\n");
591 goto error;
592 }
593
594 error:
595 return ret;
596 }
597
598 static
599 int get_data_packet(struct lttng_live_ctx *ctx,
600 struct ctf_stream_pos *pos,
601 struct lttng_live_viewer_stream *stream, uint64_t offset,
602 uint64_t len)
603 {
604 struct lttng_viewer_cmd cmd;
605 struct lttng_viewer_get_packet rq;
606 struct lttng_viewer_trace_packet rp;
607 ssize_t ret_len;
608 int ret;
609
610 retry:
611 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
612 cmd.data_size = sizeof(rq);
613 cmd.cmd_version = 0;
614
615 memset(&rq, 0, sizeof(rq));
616 rq.stream_id = htobe64(stream->id);
617 /* Already in big endian. */
618 rq.offset = offset;
619 rq.len = htobe32(len);
620
621 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
622 if (ret_len < 0) {
623 perror("[error] Error sending cmd");
624 ret = ret_len;
625 goto error;
626 }
627 assert(ret_len == sizeof(cmd));
628
629 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
630 if (ret_len < 0) {
631 perror("[error] Error sending get_data_packet request");
632 ret = ret_len;
633 goto error;
634 }
635 assert(ret_len == sizeof(rq));
636
637 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
638 if (ret_len == 0) {
639 fprintf(stderr, "[error] Remote side has closed connection\n");
640 ret = -1;
641 goto error;
642 }
643 if (ret_len < 0) {
644 perror("[error] Error receiving data response");
645 ret = ret_len;
646 goto error;
647 }
648 if (ret_len != sizeof(rp)) {
649 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
650 ", received %" PRId64 "\n", ret_len,
651 sizeof(rp));
652 ret = -1;
653 goto error;
654 }
655
656 rp.flags = be32toh(rp.flags);
657
658 switch (be32toh(rp.status)) {
659 case LTTNG_VIEWER_GET_PACKET_OK:
660 len = be32toh(rp.len);
661 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
662 "\n", len);
663 break;
664 case LTTNG_VIEWER_GET_PACKET_RETRY:
665 printf_verbose("get_data_packet: retry\n");
666 ret = -1;
667 goto end;
668 case LTTNG_VIEWER_GET_PACKET_ERR:
669 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
670 printf_verbose("get_data_packet: new metadata needed\n");
671 ret = append_metadata(ctx, stream);
672 if (ret)
673 goto error;
674 goto retry;
675 }
676 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
677 ret = ask_new_streams(ctx);
678 if (ret < 0)
679 goto error;
680 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
681 ctx->bt_ctx);
682 }
683 fprintf(stderr, "[error] get_data_packet: error\n");
684 ret = -1;
685 goto end;
686 case LTTNG_VIEWER_GET_PACKET_EOF:
687 ret = -2;
688 goto error;
689 default:
690 printf_verbose("get_data_packet: unknown\n");
691 assert(0);
692 ret = -1;
693 goto end;
694 }
695
696 if (len <= 0) {
697 ret = -1;
698 goto end;
699 }
700
701 if (len > stream->mmap_size) {
702 uint64_t new_size;
703
704 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
705 if (pos->base_mma) {
706 /* unmap old base */
707 ret = munmap_align(pos->base_mma);
708 if (ret) {
709 perror("[error] Unable to unmap old base");
710 ret = -1;
711 goto error;
712 }
713 pos->base_mma = NULL;
714 }
715 pos->base_mma = mmap_align(new_size,
716 PROT_READ | PROT_WRITE,
717 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
718 if (pos->base_mma == MAP_FAILED) {
719 perror("[error] mmap error");
720 pos->base_mma = NULL;
721 ret = -1;
722 goto error;
723 }
724
725 stream->mmap_size = new_size;
726 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
727 stream->mmap_size);
728 }
729
730 ret_len = lttng_live_recv(ctx->control_sock,
731 mmap_align_addr(pos->base_mma), len);
732 if (ret_len == 0) {
733 fprintf(stderr, "[error] Remote side has closed connection\n");
734 ret = -1;
735 goto error;
736 }
737 if (ret_len < 0) {
738 perror("[error] Error receiving trace packet");
739 ret = ret_len;
740 goto error;
741 }
742 assert(ret_len == len);
743
744 end:
745 error:
746 return ret;
747 }
748
749 static
750 int get_one_metadata_packet(struct lttng_live_ctx *ctx,
751 struct lttng_live_viewer_stream *metadata_stream)
752 {
753 uint64_t len = 0;
754 int ret;
755 struct lttng_viewer_cmd cmd;
756 struct lttng_viewer_get_metadata rq;
757 struct lttng_viewer_metadata_packet rp;
758 char *data = NULL;
759 ssize_t ret_len;
760
761 rq.stream_id = htobe64(metadata_stream->id);
762 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
763 cmd.data_size = sizeof(rq);
764 cmd.cmd_version = 0;
765
766 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
767 if (ret_len < 0) {
768 perror("[error] Error sending cmd");
769 ret = ret_len;
770 goto error;
771 }
772 assert(ret_len == sizeof(cmd));
773
774 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
775 if (ret_len < 0) {
776 perror("[error] Error sending get_metadata request");
777 ret = ret_len;
778 goto error;
779 }
780 assert(ret_len == sizeof(rq));
781
782 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
783 if (ret_len == 0) {
784 fprintf(stderr, "[error] Remote side has closed connection\n");
785 ret = -1;
786 goto error;
787 }
788 if (ret_len < 0) {
789 perror("[error] Error receiving metadata response");
790 ret = ret_len;
791 goto error;
792 }
793 assert(ret_len == sizeof(rp));
794
795 switch (be32toh(rp.status)) {
796 case LTTNG_VIEWER_METADATA_OK:
797 printf_verbose("get_metadata : OK\n");
798 break;
799 case LTTNG_VIEWER_NO_NEW_METADATA:
800 printf_verbose("get_metadata : NO NEW\n");
801 ret = 0;
802 goto end;
803 case LTTNG_VIEWER_METADATA_ERR:
804 printf_verbose("get_metadata : ERR\n");
805 ret = -1;
806 goto end;
807 default:
808 printf_verbose("get_metadata : UNKNOWN\n");
809 ret = -1;
810 goto end;
811 }
812
813 len = be64toh(rp.len);
814 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
815 if (len <= 0) {
816 ret = -1;
817 goto end;
818 }
819
820 data = zmalloc(len);
821 if (!data) {
822 perror("relay data zmalloc");
823 ret = -1;
824 goto error;
825 }
826 ret_len = lttng_live_recv(ctx->control_sock, data, len);
827 if (ret_len == 0) {
828 fprintf(stderr, "[error] Remote side has closed connection\n");
829 ret = -1;
830 free(data);
831 goto error;
832 }
833 if (ret_len < 0) {
834 perror("[error] Error receiving trace packet");
835 ret = ret_len;
836 free(data);
837 goto error;
838 }
839 assert(ret_len == len);
840
841 do {
842 ret_len = fwrite(data, 1, len,
843 metadata_stream->metadata_fp_write);
844 } while (ret_len < 0 && errno == EINTR);
845 if (ret_len < 0) {
846 fprintf(stderr, "[error] Writing in the metadata fp\n");
847 free(data);
848 ret = ret_len;
849 goto error;
850 }
851 assert(ret_len == len);
852 metadata_stream->metadata_len += len;
853 ret = len;
854
855 free(data);
856
857 end:
858 error:
859 return ret;
860 }
861
862 /*
863 * Return 0 on success, a negative value on error.
864 */
865 static
866 int get_new_metadata(struct lttng_live_ctx *ctx,
867 struct lttng_live_viewer_stream *viewer_stream,
868 char **metadata_buf)
869 {
870 int ret = 0;
871 struct lttng_live_viewer_stream *metadata_stream;
872 size_t size;
873
874 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
875 if (!metadata_stream) {
876 fprintf(stderr, "[error] No metadata stream\n");
877 ret = -1;
878 goto error;
879 }
880 metadata_stream->metadata_len = 0;
881 ret = open_metadata_fp_write(metadata_stream, metadata_buf, &size);
882 if (ret < 0) {
883 goto error;
884 }
885
886 do {
887 /*
888 * get_one_metadata_packet returns the number of bytes
889 * received, 0 when we have received everything, a
890 * negative value on error.
891 */
892 ret = get_one_metadata_packet(ctx, metadata_stream);
893 } while (ret > 0);
894
895 if (fclose(metadata_stream->metadata_fp_write))
896 perror("fclose");
897 metadata_stream->metadata_fp_write = NULL;
898
899 error:
900 return ret;
901 }
902
903 /*
904 * Get one index for a stream.
905 *
906 * Returns 0 on success or a negative value on error.
907 */
908 static
909 int get_next_index(struct lttng_live_ctx *ctx,
910 struct lttng_live_viewer_stream *viewer_stream,
911 struct packet_index *index)
912 {
913 struct lttng_viewer_cmd cmd;
914 struct lttng_viewer_get_next_index rq;
915 struct lttng_viewer_index rp;
916 int ret;
917 ssize_t ret_len;
918
919 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
920 cmd.data_size = sizeof(rq);
921 cmd.cmd_version = 0;
922
923 memset(&rq, 0, sizeof(rq));
924 rq.stream_id = htobe64(viewer_stream->id);
925
926 retry:
927 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
928 if (ret_len < 0) {
929 perror("[error] Error sending cmd");
930 ret = ret_len;
931 goto error;
932 }
933 assert(ret_len == sizeof(cmd));
934
935 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
936 if (ret_len < 0) {
937 perror("[error] Error sending get_next_index request");
938 ret = ret_len;
939 goto error;
940 }
941 assert(ret_len == sizeof(rq));
942
943 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
944 if (ret_len == 0) {
945 fprintf(stderr, "[error] Remote side has closed connection\n");
946 ret = -1;
947 goto error;
948 }
949 if (ret_len < 0) {
950 perror("[error] Error receiving index response");
951 ret = ret_len;
952 goto error;
953 }
954 assert(ret_len == sizeof(rp));
955
956 rp.flags = be32toh(rp.flags);
957
958 switch (be32toh(rp.status)) {
959 case LTTNG_VIEWER_INDEX_INACTIVE:
960 printf_verbose("get_next_index: inactive\n");
961 memset(index, 0, sizeof(struct packet_index));
962 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
963 break;
964 case LTTNG_VIEWER_INDEX_OK:
965 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
966 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
967 index->offset = be64toh(rp.offset);
968 index->packet_size = be64toh(rp.packet_size);
969 index->content_size = be64toh(rp.content_size);
970 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
971 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
972 index->events_discarded = be64toh(rp.events_discarded);
973
974 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
975 ret = append_metadata(ctx, viewer_stream);
976 if (ret)
977 goto error;
978 }
979 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
980 ret = ask_new_streams(ctx);
981 if (ret < 0)
982 goto error;
983 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
984 ctx->bt_ctx);
985 }
986 break;
987 case LTTNG_VIEWER_INDEX_RETRY:
988 printf_verbose("get_next_index: retry\n");
989 sleep(1);
990 goto retry;
991 case LTTNG_VIEWER_INDEX_HUP:
992 printf_verbose("get_next_index: stream hung up\n");
993 viewer_stream->id = -1ULL;
994 index->offset = EOF;
995 ctx->session->stream_count--;
996 break;
997 case LTTNG_VIEWER_INDEX_ERR:
998 fprintf(stderr, "[error] get_next_index: error\n");
999 ret = -1;
1000 goto error;
1001 default:
1002 fprintf(stderr, "[error] get_next_index: unkwown value\n");
1003 ret = -1;
1004 goto error;
1005 }
1006
1007 ret = 0;
1008
1009 error:
1010 return ret;
1011 }
1012
1013 static
1014 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
1015 int whence)
1016 {
1017 struct ctf_stream_pos *pos;
1018 struct ctf_file_stream *file_stream;
1019 struct packet_index *prev_index = NULL, *cur_index;
1020 struct lttng_live_viewer_stream *viewer_stream;
1021 struct lttng_live_session *session;
1022 int ret;
1023
1024 retry:
1025 pos = ctf_pos(stream_pos);
1026 file_stream = container_of(pos, struct ctf_file_stream, pos);
1027 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
1028 session = viewer_stream->session;
1029
1030 switch (pos->packet_index->len) {
1031 case 0:
1032 g_array_set_size(pos->packet_index, 1);
1033 cur_index = &g_array_index(pos->packet_index,
1034 struct packet_index, 0);
1035 break;
1036 case 1:
1037 g_array_set_size(pos->packet_index, 2);
1038 prev_index = &g_array_index(pos->packet_index,
1039 struct packet_index, 0);
1040 cur_index = &g_array_index(pos->packet_index,
1041 struct packet_index, 1);
1042 break;
1043 case 2:
1044 g_array_index(pos->packet_index,
1045 struct packet_index, 0) =
1046 g_array_index(pos->packet_index,
1047 struct packet_index, 1);
1048 prev_index = &g_array_index(pos->packet_index,
1049 struct packet_index, 0);
1050 cur_index = &g_array_index(pos->packet_index,
1051 struct packet_index, 1);
1052 break;
1053 default:
1054 abort();
1055 break;
1056 }
1057 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
1058 ret = get_next_index(session->ctx, viewer_stream, cur_index);
1059 if (ret < 0) {
1060 pos->offset = EOF;
1061 fprintf(stderr, "[error] get_next_index failed\n");
1062 return;
1063 }
1064
1065 pos->packet_size = cur_index->packet_size;
1066 pos->content_size = cur_index->content_size;
1067 pos->mmap_base_offset = 0;
1068 if (cur_index->offset == EOF) {
1069 pos->offset = EOF;
1070 } else {
1071 pos->offset = 0;
1072 }
1073
1074 if (cur_index->content_size == 0) {
1075 file_stream->parent.cycles_timestamp =
1076 cur_index->ts_cycles.timestamp_end;
1077 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
1078 &file_stream->parent,
1079 cur_index->ts_cycles.timestamp_end);
1080 } else {
1081 /* Convert the timestamps and append to the real_index. */
1082 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1083 &file_stream->parent,
1084 cur_index->ts_cycles.timestamp_begin);
1085 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1086 &file_stream->parent,
1087 cur_index->ts_cycles.timestamp_end);
1088
1089 ctf_update_current_packet_index(&file_stream->parent,
1090 prev_index, cur_index);
1091
1092 file_stream->parent.cycles_timestamp =
1093 cur_index->ts_cycles.timestamp_begin;
1094 file_stream->parent.real_timestamp =
1095 cur_index->ts_real.timestamp_begin;
1096 }
1097
1098 if (pos->packet_size == 0 || pos->offset == EOF) {
1099 goto end;
1100 }
1101
1102 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1103 viewer_stream->id);
1104 ret = get_data_packet(session->ctx, pos, viewer_stream,
1105 be64toh(cur_index->offset),
1106 cur_index->packet_size / CHAR_BIT);
1107 if (ret == -2) {
1108 goto retry;
1109 } else if (ret < 0) {
1110 pos->offset = EOF;
1111 fprintf(stderr, "[error] get_data_packet failed\n");
1112 return;
1113 }
1114
1115 printf_verbose("Index received : packet_size : %" PRIu64
1116 ", offset %" PRIu64 ", content_size %" PRIu64
1117 ", timestamp_end : %" PRIu64 "\n",
1118 cur_index->packet_size, cur_index->offset,
1119 cur_index->content_size,
1120 cur_index->ts_cycles.timestamp_end);
1121
1122 /* update trace_packet_header and stream_packet_context */
1123 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1124 /* Read packet header */
1125 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1126 if (ret) {
1127 pos->offset = EOF;
1128 fprintf(stderr, "[error] trace packet header read failed\n");
1129 goto end;
1130 }
1131 }
1132 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1133 /* Read packet context */
1134 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1135 if (ret) {
1136 pos->offset = EOF;
1137 fprintf(stderr, "[error] stream packet context read failed\n");
1138 goto end;
1139 }
1140 }
1141 pos->data_offset = pos->offset;
1142
1143 end:
1144 return;
1145 }
1146
1147 int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1148 {
1149 struct lttng_viewer_cmd cmd;
1150 struct lttng_viewer_create_session_response resp;
1151 int ret;
1152 ssize_t ret_len;
1153
1154 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1155 cmd.data_size = 0;
1156 cmd.cmd_version = 0;
1157
1158 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
1159 if (ret_len < 0) {
1160 perror("[error] Error sending cmd");
1161 ret = ret_len;
1162 goto error;
1163 }
1164 assert(ret_len == sizeof(cmd));
1165
1166 ret_len = lttng_live_recv(ctx->control_sock, &resp, sizeof(resp));
1167 if (ret_len == 0) {
1168 fprintf(stderr, "[error] Remote side has closed connection\n");
1169 ret = -1;
1170 goto error;
1171 }
1172 if (ret_len < 0) {
1173 perror("[error] Error receiving create session reply");
1174 ret = ret_len;
1175 goto error;
1176 }
1177 assert(ret_len == sizeof(resp));
1178
1179 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1180 fprintf(stderr, "[error] Error creating viewer session\n");
1181 ret = -1;
1182 goto error;
1183 }
1184 ret = 0;
1185
1186 error:
1187 return ret;
1188 }
1189
1190 static
1191 int del_traces(gpointer key, gpointer value, gpointer user_data)
1192 {
1193 struct bt_context *bt_ctx = user_data;
1194 struct lttng_live_ctf_trace *trace = value;
1195 int ret;
1196
1197 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1198 if (ret < 0)
1199 fprintf(stderr, "[error] removing trace from context\n");
1200
1201 /* remove the key/value pair from the HT. */
1202 return 1;
1203 }
1204
1205 static
1206 void add_traces(gpointer key, gpointer value, gpointer user_data)
1207 {
1208 int i, ret;
1209 struct bt_context *bt_ctx = user_data;
1210 struct lttng_live_ctf_trace *trace = value;
1211 struct lttng_live_viewer_stream *stream;
1212 struct bt_mmap_stream *new_mmap_stream;
1213 struct bt_mmap_stream_list mmap_list;
1214 struct lttng_live_ctx *ctx = NULL;
1215 struct bt_trace_descriptor *td;
1216 struct bt_trace_handle *handle;
1217
1218 /*
1219 * We don't know how many streams we will receive for a trace, so
1220 * once we are done receiving the traces, we add all the traces
1221 * received to the bt_context.
1222 * We can receive streams during the attach command or the
1223 * get_new_streams, so we have to make sure not to add multiple
1224 * times the same traces.
1225 * If a trace is already in the context, we just skip this function.
1226 */
1227 if (trace->in_use)
1228 return;
1229
1230 BT_INIT_LIST_HEAD(&mmap_list.head);
1231
1232 for (i = 0; i < trace->streams->len; i++) {
1233 stream = g_ptr_array_index(trace->streams, i);
1234 ctx = stream->session->ctx;
1235
1236 if (!stream->metadata_flag) {
1237 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1238 new_mmap_stream->priv = (void *) stream;
1239 new_mmap_stream->fd = -1;
1240 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1241 } else {
1242 char *metadata_buf = NULL;
1243
1244 /* Get all possible metadata before starting */
1245 ret = get_new_metadata(ctx, stream, &metadata_buf);
1246 if (ret) {
1247 free(metadata_buf);
1248 goto end_free;
1249 }
1250 if (!stream->metadata_len) {
1251 fprintf(stderr, "[error] empty metadata\n");
1252 ret = -1;
1253 free(metadata_buf);
1254 goto end_free;
1255 }
1256
1257 trace->metadata_fp = babeltrace_fmemopen(metadata_buf,
1258 stream->metadata_len, "rb");
1259 if (!trace->metadata_fp) {
1260 perror("Metadata fmemopen");
1261 ret = -1;
1262 free(metadata_buf);
1263 goto end_free;
1264 }
1265 }
1266 }
1267
1268 if (!trace->metadata_fp) {
1269 fprintf(stderr, "[error] No metadata stream opened\n");
1270 goto end_free;
1271 }
1272
1273 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1274 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1275 if (ret < 0) {
1276 fprintf(stderr, "[error] Error adding trace\n");
1277 goto end_free;
1278 }
1279 trace->metadata_stream->metadata_len = 0;
1280
1281 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1282 bt_ctx->trace_handles,
1283 (gpointer) (unsigned long) ret);
1284 td = handle->td;
1285 trace->handle = handle;
1286 if (bt_ctx->current_iterator) {
1287 bt_iter_add_trace(bt_ctx->current_iterator, td);
1288 }
1289
1290 trace->trace_id = ret;
1291 trace->in_use = 1;
1292
1293 goto end;
1294
1295 end_free:
1296 bt_context_put(bt_ctx);
1297 end:
1298 return;
1299 }
1300
1301 int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
1302 {
1303 struct lttng_viewer_cmd cmd;
1304 struct lttng_viewer_new_streams_request rq;
1305 struct lttng_viewer_new_streams_response rp;
1306 struct lttng_viewer_stream stream;
1307 int ret, i;
1308 ssize_t ret_len;
1309 uint32_t stream_count;
1310
1311 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1312 cmd.data_size = sizeof(rq);
1313 cmd.cmd_version = 0;
1314
1315 memset(&rq, 0, sizeof(rq));
1316 rq.session_id = htobe64(id);
1317
1318 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
1319 if (ret_len < 0) {
1320 perror("[error] Error sending cmd");
1321 ret = ret_len;
1322 goto error;
1323 }
1324 assert(ret_len == sizeof(cmd));
1325
1326 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
1327 if (ret_len < 0) {
1328 perror("[error] Error sending get_new_streams request");
1329 ret = ret_len;
1330 goto error;
1331 }
1332 assert(ret_len == sizeof(rq));
1333
1334 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
1335 if (ret_len == 0) {
1336 fprintf(stderr, "[error] Remote side has closed connection\n");
1337 ret = -1;
1338 goto error;
1339 }
1340 if (ret_len < 0) {
1341 perror("[error] Error receiving get_new_streams response");
1342 ret = ret_len;
1343 goto error;
1344 }
1345 assert(ret_len == sizeof(rp));
1346
1347 switch(be32toh(rp.status)) {
1348 case LTTNG_VIEWER_NEW_STREAMS_OK:
1349 break;
1350 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1351 ret = 0;
1352 goto end;
1353 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1354 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1355 goto end;
1356 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1357 fprintf(stderr, "[error] get_new_streams error\n");
1358 ret = -1;
1359 goto end;
1360 default:
1361 fprintf(stderr, "[error] Unknown return code %u\n",
1362 be32toh(rp.status));
1363 ret = -1;
1364 goto end;
1365 }
1366
1367 stream_count = be32toh(rp.streams_count);
1368 ctx->session->stream_count += stream_count;
1369 /*
1370 * When the session is created but not started, we do an active wait
1371 * until it starts. It allows the viewer to start processing the trace
1372 * as soon as the session starts.
1373 */
1374 if (ctx->session->stream_count == 0) {
1375 ret = 0;
1376 goto end;
1377 }
1378 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1379 ctx->session->stream_count);
1380 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1381 ctx->session->stream_count);
1382 for (i = 0; i < stream_count; i++) {
1383 ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
1384 if (ret_len == 0) {
1385 fprintf(stderr, "[error] Remote side has closed connection\n");
1386 ret = -1;
1387 goto error;
1388 }
1389 if (ret_len < 0) {
1390 perror("[error] Error receiving stream");
1391 ret = ret_len;
1392 goto error;
1393 }
1394 assert(ret_len == sizeof(stream));
1395 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1396 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1397
1398 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1399 be64toh(stream.id), stream.path_name,
1400 stream.channel_name);
1401 ctx->session->streams[i].id = be64toh(stream.id);
1402 ctx->session->streams[i].session = ctx->session;
1403
1404 ctx->session->streams[i].first_read = 1;
1405 ctx->session->streams[i].mmap_size = 0;
1406
1407 if (be32toh(stream.metadata_flag)) {
1408 ctx->session->streams[i].metadata_flag = 1;
1409 }
1410 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1411 be64toh(stream.ctf_trace_id));
1412 if (ret < 0) {
1413 goto error;
1414 }
1415
1416 }
1417 ret = 0;
1418
1419 end:
1420 error:
1421 return ret;
1422 }
1423
1424 void lttng_live_read(struct lttng_live_ctx *ctx)
1425 {
1426 int ret, i;
1427 struct bt_ctf_iter *iter;
1428 const struct bt_ctf_event *event;
1429 struct bt_iter_pos begin_pos;
1430 struct bt_trace_descriptor *td_write;
1431 struct bt_format *fmt_write;
1432 struct ctf_text_stream_pos *sout;
1433 uint64_t id;
1434
1435 ctx->bt_ctx = bt_context_create();
1436 if (!ctx->bt_ctx) {
1437 fprintf(stderr, "[error] bt_context_create allocation\n");
1438 goto end;
1439 }
1440
1441 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1442 if (!fmt_write) {
1443 fprintf(stderr, "[error] ctf-text error\n");
1444 goto end;
1445 }
1446
1447 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1448 if (!td_write) {
1449 fprintf(stderr, "[error] Error opening output trace\n");
1450 goto end_free;
1451 }
1452
1453 sout = container_of(td_write, struct ctf_text_stream_pos,
1454 trace_descriptor);
1455 if (!sout->parent.event_cb)
1456 goto end_free;
1457
1458 ret = lttng_live_create_viewer_session(ctx);
1459 if (ret < 0) {
1460 goto end_free;
1461 }
1462
1463 for (i = 0; i < ctx->session_ids->len; i++) {
1464 id = g_array_index(ctx->session_ids, uint64_t, i);
1465 printf_verbose("Attaching to session %lu\n", id);
1466 ret = lttng_live_attach_session(ctx, id);
1467 printf_verbose("Attaching session returns %d\n", ret);
1468 if (ret < 0) {
1469 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1470 fprintf(stderr, "[error] Unknown session ID\n");
1471 }
1472 goto end_free;
1473 }
1474 }
1475
1476 /*
1477 * As long as the session is active, we try to get new streams.
1478 */
1479 for (;;) {
1480 int flags;
1481
1482 while (!ctx->session->stream_count) {
1483 if (ctx->session_ids->len == 0)
1484 goto end_free;
1485 ret = ask_new_streams(ctx);
1486 if (ret < 0)
1487 goto end_free;
1488 }
1489
1490 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1491 ctx->bt_ctx);
1492
1493 begin_pos.type = BT_SEEK_BEGIN;
1494 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
1495 if (!iter) {
1496 fprintf(stderr, "[error] Iterator creation error\n");
1497 goto end;
1498 }
1499 for (;;) {
1500 event = bt_ctf_iter_read_event_flags(iter, &flags);
1501 if (!(flags & BT_ITER_FLAG_RETRY)) {
1502 if (!event) {
1503 /* End of trace */
1504 break;
1505 }
1506 ret = sout->parent.event_cb(&sout->parent,
1507 event->parent->stream);
1508 if (ret) {
1509 fprintf(stderr, "[error] Writing "
1510 "event failed.\n");
1511 goto end_free;
1512 }
1513 }
1514 ret = bt_iter_next(bt_ctf_get_iter(iter));
1515 if (ret < 0) {
1516 goto end_free;
1517 }
1518 }
1519 bt_ctf_iter_destroy(iter);
1520 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1521 del_traces, ctx->bt_ctx);
1522 ctx->session->stream_count = 0;
1523 }
1524
1525 end_free:
1526 bt_context_put(ctx->bt_ctx);
1527 end:
1528 return;
1529 }
This page took 0.095256 seconds and 4 git commands to generate.