Attach and list by session name and hostname
[babeltrace.git] / formats / lttng-live / lttng-live-functions.c
CommitLineData
4a744367
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <netinet/in.h>
27#include <netdb.h>
28#include <stdio.h>
29#include <string.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <errno.h>
33#include <inttypes.h>
34#include <fcntl.h>
35#include <sys/mman.h>
36
37#include <babeltrace/ctf/ctf-index.h>
38
39#include <babeltrace/babeltrace.h>
40#include <babeltrace/ctf/events.h>
41#include <babeltrace/ctf/callbacks.h>
42#include <babeltrace/ctf/iterator.h>
43
44/* for packet_index */
45#include <babeltrace/ctf/types.h>
46
47#include <babeltrace/ctf/metadata.h>
48#include <babeltrace/ctf-text/types.h>
49#include <babeltrace/ctf/events-internal.h>
50#include <formats/ctf/events-private.h>
51
52#include "lttng-live-functions.h"
158eea6e 53#include "lttng-viewer-abi.h"
4a744367
JD
54
55/*
56 * Memory allocation zeroed
57 */
58#define zmalloc(x) calloc(1, x)
59
60#ifndef max_t
61#define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63#endif
64
b5a1fa45
JD
65static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
66 size_t index, int whence);
2acdc547
JD
67static void add_traces(gpointer key, gpointer value, gpointer user_data);
68static int del_traces(gpointer key, gpointer value, gpointer user_data);
69
b5a1fa45 70int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
4a744367
JD
71{
72 struct hostent *host;
73 struct sockaddr_in server_addr;
74 int ret;
75
b5a1fa45 76 host = gethostbyname(ctx->relay_hostname);
4a744367
JD
77 if (!host) {
78 ret = -1;
79 goto end;
80 }
81
82 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
83 perror("Socket");
84 ret = -1;
85 goto end;
86 }
87
88 server_addr.sin_family = AF_INET;
b5a1fa45 89 server_addr.sin_port = htons(ctx->port);
4a744367
JD
90 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
91 bzero(&(server_addr.sin_zero), 8);
92
93 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
94 sizeof(struct sockaddr)) == -1) {
95 perror("Connect");
96 ret = -1;
97 goto end;
98 }
99
100 ret = 0;
101
102end:
103 return ret;
104}
105
106int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
107{
108 struct lttng_viewer_cmd cmd;
109 struct lttng_viewer_connect connect;
110 int ret;
111 ssize_t ret_len;
112
113 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
114 cmd.data_size = sizeof(connect);
115 cmd.cmd_version = 0;
116
0c3cd7e1 117 connect.viewer_session_id = -1ULL; /* will be set on recv */
4a744367
JD
118 connect.major = htobe32(LTTNG_LIVE_MAJOR);
119 connect.minor = htobe32(LTTNG_LIVE_MINOR);
120 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
121
122 do {
123 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
124 } while (ret_len < 0 && errno == EINTR);
125 if (ret_len < 0) {
126 fprintf(stderr, "[error] Error sending cmd\n");
127 ret = ret_len;
128 goto error;
129 }
130 assert(ret_len == sizeof(cmd));
131
132 do {
133 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
134 } while (ret_len < 0 && errno == EINTR);
135 if (ret_len < 0) {
136 fprintf(stderr, "[error] Error sending version\n");
137 ret = ret_len;
138 goto error;
139 }
140 assert(ret_len == sizeof(connect));
141
142 do {
143 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
144 } while (ret_len < 0 && errno == EINTR);
145 if (ret_len < 0) {
146 fprintf(stderr, "[error] Error receiving version\n");
147 ret = ret_len;
148 goto error;
149 }
150 assert(ret_len == sizeof(connect));
151
152 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
153 be64toh(connect.viewer_session_id));
154 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
155 be32toh(connect.minor));
156
157 ret = 0;
158
159error:
160 return ret;
161}
162
b5a1fa45
JD
163static
164void free_session_list(GPtrArray *session_list)
165{
166 int i;
167 struct lttng_live_relay_session *relay_session;
168
169 for (i = 0; i < session_list->len; i++) {
170 relay_session = g_ptr_array_index(session_list, i);
171 free(relay_session->name);
172 free(relay_session->hostname);
173 }
174 g_ptr_array_free(session_list, TRUE);
175}
176
177static
178void print_session_list(GPtrArray *session_list, const char *path)
179{
180 int i;
181 struct lttng_live_relay_session *relay_session;
182
183 for (i = 0; i < session_list->len; i++) {
184 relay_session = g_ptr_array_index(session_list, i);
185 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
186 "%u stream(s), %u client(s) connected)\n",
187 path, relay_session->hostname,
188 relay_session->name, relay_session->timer,
189 relay_session->streams, relay_session->clients);
190 }
191}
192
193static
194void update_session_list(GPtrArray *session_list, char *hostname,
195 char *session_name, uint32_t streams, uint32_t clients,
196 uint32_t timer)
197{
198 int i, found = 0;
199 struct lttng_live_relay_session *relay_session;
200
201 for (i = 0; i < session_list->len; i++) {
202 relay_session = g_ptr_array_index(session_list, i);
203 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
204 strncmp(relay_session->name,
205 session_name, NAME_MAX) == 0) {
206 relay_session->streams += streams;
207 if (relay_session->clients < clients)
208 relay_session->clients = clients;
209 found = 1;
210 break;
211 }
212 }
213 if (found)
214 return;
215
216 relay_session = g_new0(struct lttng_live_relay_session, 1);
217 relay_session->hostname = strndup(hostname, NAME_MAX);
218 relay_session->name = strndup(session_name, NAME_MAX);
219 relay_session->clients = clients;
220 relay_session->streams = streams;
221 relay_session->timer = timer;
222 g_ptr_array_add(session_list, relay_session);
223}
224
4a744367
JD
225int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
226{
227 struct lttng_viewer_cmd cmd;
228 struct lttng_viewer_list_sessions list;
229 struct lttng_viewer_session lsession;
b5a1fa45 230 int i, ret, sessions_count, print_list = 0;
4a744367 231 ssize_t ret_len;
b5a1fa45
JD
232 uint64_t session_id;
233 GPtrArray *session_list = NULL;
234
235 if (strlen(ctx->session_name) == 0) {
236 print_list = 1;
237 session_list = g_ptr_array_new();
238 }
4a744367
JD
239
240 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
241 cmd.data_size = 0;
242 cmd.cmd_version = 0;
243
244 do {
245 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
246 } while (ret_len < 0 && errno == EINTR);
247 if (ret_len < 0) {
248 fprintf(stderr, "[error] Error sending cmd\n");
249 ret = ret_len;
250 goto error;
251 }
252 assert(ret_len == sizeof(cmd));
253
254 do {
255 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
256 } while (ret_len < 0 && errno == EINTR);
257 if (ret_len < 0) {
258 fprintf(stderr, "[error] Error receiving session list\n");
259 ret = ret_len;
260 goto error;
261 }
262 assert(ret_len == sizeof(list));
263
264 sessions_count = be32toh(list.sessions_count);
4a744367
JD
265 for (i = 0; i < sessions_count; i++) {
266 do {
267 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
268 } while (ret_len < 0 && errno == EINTR);
269 if (ret_len < 0) {
270 fprintf(stderr, "[error] Error receiving session\n");
271 ret = ret_len;
272 goto error;
273 }
274 assert(ret_len == sizeof(lsession));
9ed31e53
MD
275 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
276 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
b5a1fa45
JD
277 session_id = be64toh(lsession.id);
278
279 if (print_list) {
280 update_session_list(session_list,
281 lsession.hostname,
282 lsession.session_name,
283 be32toh(lsession.streams),
284 be32toh(lsession.clients),
285 be32toh(lsession.live_timer));
286 } else {
287 if ((strncmp(lsession.session_name, ctx->session_name,
288 NAME_MAX) == 0) && (strncmp(lsession.hostname,
289 ctx->traced_hostname, NAME_MAX) == 0)) {
290 printf_verbose("Reading from session %" PRIu64 "\n",
291 session_id);
292 g_array_append_val(ctx->session_ids,
293 session_id);
294 }
295 }
296 }
4a744367 297
b5a1fa45
JD
298 if (print_list) {
299 print_session_list(session_list, path);
300 free_session_list(session_list);
4a744367
JD
301 }
302
303 ret = 0;
304
305error:
306 return ret;
307}
308
309int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
310 uint64_t ctf_trace_id)
311{
312 struct lttng_live_ctf_trace *trace;
313 int ret = 0;
314
315 trace = g_hash_table_lookup(stream->session->ctf_traces,
316 (gpointer) ctf_trace_id);
317 if (!trace) {
318 trace = g_new0(struct lttng_live_ctf_trace, 1);
4a744367
JD
319 trace->ctf_trace_id = ctf_trace_id;
320 trace->streams = g_ptr_array_new();
321 g_hash_table_insert(stream->session->ctf_traces,
322 (gpointer) ctf_trace_id,
323 trace);
324 }
325 if (stream->metadata_flag)
326 trace->metadata_stream = stream;
327
328 stream->ctf_trace = trace;
329 g_ptr_array_add(trace->streams, stream);
330
4a744367
JD
331 return ret;
332}
333
334int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
335{
336 struct lttng_viewer_cmd cmd;
337 struct lttng_viewer_attach_session_request rq;
338 struct lttng_viewer_attach_session_response rp;
339 struct lttng_viewer_stream stream;
340 int ret, i;
341 ssize_t ret_len;
342
343 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
344 cmd.data_size = sizeof(rq);
345 cmd.cmd_version = 0;
346
347 memset(&rq, 0, sizeof(rq));
348 rq.session_id = htobe64(id);
349 // TODO: add cmd line parameter to select seek beginning
350 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
351 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
352
353 do {
354 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
355 } while (ret_len < 0 && errno == EINTR);
356 if (ret_len < 0) {
357 fprintf(stderr, "[error] Error sending cmd\n");
358 ret = ret_len;
359 goto error;
360 }
361 assert(ret_len == sizeof(cmd));
362
363 do {
364 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
365 } while (ret_len < 0 && errno == EINTR);
366 if (ret_len < 0) {
367 fprintf(stderr, "[error] Error sending attach request\n");
368 ret = ret_len;
369 goto error;
370 }
371 assert(ret_len == sizeof(rq));
372
373 do {
374 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
375 } while (ret_len < 0 && errno == EINTR);
376 if (ret_len < 0) {
377 fprintf(stderr, "[error] Error receiving attach response\n");
378 ret = ret_len;
379 goto error;
380 }
381 assert(ret_len == sizeof(rp));
382
383 switch(be32toh(rp.status)) {
384 case LTTNG_VIEWER_ATTACH_OK:
385 break;
386 case LTTNG_VIEWER_ATTACH_UNK:
387 ret = -LTTNG_VIEWER_ATTACH_UNK;
388 goto end;
389 case LTTNG_VIEWER_ATTACH_ALREADY:
390 fprintf(stderr, "[error] Already a viewer attached\n");
391 ret = -1;
392 goto end;
393 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
394 fprintf(stderr, "[error] Not a live session\n");
395 ret = -1;
396 goto end;
397 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
398 fprintf(stderr, "[error] Wrong seek parameter\n");
399 ret = -1;
400 goto end;
401 default:
402 fprintf(stderr, "[error] Unknown attach return code %u\n",
403 be32toh(rp.status));
404 ret = -1;
405 goto end;
406 }
407 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
408 ret = -1;
409 goto end;
410 }
411
2acdc547 412 ctx->session->stream_count += be32toh(rp.streams_count);
4a744367
JD
413 /*
414 * When the session is created but not started, we do an active wait
415 * until it starts. It allows the viewer to start processing the trace
416 * as soon as the session starts.
417 */
418 if (ctx->session->stream_count == 0) {
419 ret = 0;
420 goto end;
421 }
422 printf_verbose("Waiting for %" PRIu64 " streams:\n",
423 ctx->session->stream_count);
424 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
425 ctx->session->stream_count);
4a744367
JD
426 for (i = 0; i < be32toh(rp.streams_count); i++) {
427 do {
428 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
429 } while (ret_len < 0 && errno == EINTR);
430 if (ret_len < 0) {
431 fprintf(stderr, "[error] Error receiving stream\n");
432 ret = ret_len;
433 goto error;
434 }
435 assert(ret_len == sizeof(stream));
9ed31e53
MD
436 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
437 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
4a744367
JD
438
439 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
440 be64toh(stream.id), stream.path_name,
441 stream.channel_name);
442 ctx->session->streams[i].id = be64toh(stream.id);
443 ctx->session->streams[i].session = ctx->session;
444
445 ctx->session->streams[i].first_read = 1;
446 ctx->session->streams[i].mmap_size = 0;
447
448 if (be32toh(stream.metadata_flag)) {
449 char *path;
450
451 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
6a4d2b5a
MD
452 if (!path) {
453 perror("strdup");
454 ret = -1;
455 goto error;
456 }
457 if (!mkdtemp(path)) {
458 perror("mkdtemp");
459 free(path);
460 ret = -1;
461 goto error;
462 }
4a744367 463 ctx->session->streams[i].metadata_flag = 1;
4a744367
JD
464 snprintf(ctx->session->streams[i].path,
465 sizeof(ctx->session->streams[i].path),
466 "%s/%s", path,
467 stream.channel_name);
468 ret = open(ctx->session->streams[i].path,
469 O_WRONLY | O_CREAT | O_TRUNC,
470 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
471 if (ret < 0) {
ab9d07f0 472 perror("open");
6a4d2b5a 473 free(path);
4a744367
JD
474 goto error;
475 }
476 ctx->session->streams[i].fd = ret;
6a4d2b5a 477 free(path);
4a744367
JD
478 }
479 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
480 be64toh(stream.ctf_trace_id));
481 if (ret < 0) {
482 goto error;
483 }
484
485 }
486 ret = 0;
487
488end:
489error:
490 return ret;
491}
492
2acdc547
JD
493static
494int ask_new_streams(struct lttng_live_ctx *ctx)
495{
496 int i, ret = 0;
497 uint64_t id;
498
499restart:
500 for (i = 0; i < ctx->session_ids->len; i++) {
501 id = g_array_index(ctx->session_ids, uint64_t, i);
502 ret = lttng_live_get_new_streams(ctx, id);
503 printf_verbose("Asking for new streams returns %d\n",
504 ret);
505 if (ret < 0) {
506 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
507 printf_verbose("Session %" PRIu64 " closed\n",
508 id);
b5a1fa45
JD
509 /*
510 * The streams have already been closed during
511 * the reading, so we only need to get rid of
512 * the trace in our internal table of sessions.
513 */
2acdc547
JD
514 g_array_remove_index(ctx->session_ids, i);
515 /*
516 * We can't continue iterating on the g_array
517 * after a remove, we have to start again.
518 */
519 goto restart;
520 } else {
521 ret = -1;
522 goto end;
523 }
524 }
525 }
526
527end:
528 return ret;
529}
530
4a744367
JD
531static
532int get_data_packet(struct lttng_live_ctx *ctx,
533 struct ctf_stream_pos *pos,
534 struct lttng_live_viewer_stream *stream, uint64_t offset,
535 uint64_t len)
536{
537 struct lttng_viewer_cmd cmd;
538 struct lttng_viewer_get_packet rq;
539 struct lttng_viewer_trace_packet rp;
540 ssize_t ret_len;
541 int ret;
542
543 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
544 cmd.data_size = sizeof(rq);
545 cmd.cmd_version = 0;
546
547 memset(&rq, 0, sizeof(rq));
548 rq.stream_id = htobe64(stream->id);
549 /* Already in big endian. */
550 rq.offset = offset;
551 rq.len = htobe32(len);
552
553 do {
554 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
555 } while (ret_len < 0 && errno == EINTR);
556 if (ret_len < 0) {
557 fprintf(stderr, "[error] Error sending cmd\n");
558 ret = ret_len;
559 goto error;
560 }
561 assert(ret_len == sizeof(cmd));
562
563 do {
564 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
565 } while (ret_len < 0 && errno == EINTR);
566 if (ret_len < 0) {
567 fprintf(stderr, "[error] Error sending get_data_packet request\n");
568 ret = ret_len;
569 goto error;
570 }
571 assert(ret_len == sizeof(rq));
572
573 do {
574 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
575 } while (ret_len < 0 && errno == EINTR);
576 if (ret_len < 0) {
577 fprintf(stderr, "[error] Error receiving data response\n");
578 ret = ret_len;
579 goto error;
580 }
57b9d843
JD
581 if (ret_len != sizeof(rp)) {
582 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
583 ", received %" PRId64 "\n", ret_len,
584 sizeof(rp));
585 ret = -1;
586 goto error;
587 }
4a744367
JD
588
589 rp.flags = be32toh(rp.flags);
590
591 switch (be32toh(rp.status)) {
592 case LTTNG_VIEWER_GET_PACKET_OK:
593 len = be32toh(rp.len);
594 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
595 "\n", len);
596 break;
597 case LTTNG_VIEWER_GET_PACKET_RETRY:
598 printf_verbose("get_data_packet: retry\n");
599 ret = -1;
600 goto end;
601 case LTTNG_VIEWER_GET_PACKET_ERR:
602 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
603 printf_verbose("get_data_packet: new metadata needed\n");
604 ret = 0;
605 goto end;
606 }
2acdc547
JD
607 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
608 ret = ask_new_streams(ctx);
609 if (ret < 0)
610 goto error;
611 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
612 ctx->bt_ctx);
613 }
4a744367
JD
614 fprintf(stderr, "[error] get_data_packet: error\n");
615 ret = -1;
616 goto end;
617 case LTTNG_VIEWER_GET_PACKET_EOF:
618 ret = -2;
619 goto error;
620 default:
621 printf_verbose("get_data_packet: unknown\n");
622 assert(0);
623 ret = -1;
624 goto end;
625 }
626
627 if (len <= 0) {
628 ret = -1;
629 goto end;
630 }
631
632 if (len > stream->mmap_size) {
633 uint64_t new_size;
634
635 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
636 if (pos->base_mma) {
637 /* unmap old base */
638 ret = munmap_align(pos->base_mma);
639 if (ret) {
640 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
641 strerror(errno));
642 ret = -1;
643 goto error;
644 }
645 pos->base_mma = NULL;
646 }
647 pos->base_mma = mmap_align(new_size,
648 PROT_READ | PROT_WRITE,
649 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
650 if (pos->base_mma == MAP_FAILED) {
651 fprintf(stderr, "[error] mmap error %s.\n",
652 strerror(errno));
653 pos->base_mma = NULL;
654 ret = -1;
655 goto error;
656 }
657
658 stream->mmap_size = new_size;
659 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
660 stream->mmap_size);
661 }
662
663 do {
664 ret_len = recv(ctx->control_sock,
665 mmap_align_addr(pos->base_mma), len,
666 MSG_WAITALL);
667 } while (ret_len < 0 && errno == EINTR);
668 if (ret_len < 0) {
669 fprintf(stderr, "[error] Error receiving trace packet\n");
670 ret = ret_len;
671 goto error;
672 }
673 assert(ret_len == len);
674
675end:
676error:
677 return ret;
678}
679
680/*
681 * Return number of metadata bytes written or a negative value on error.
682 */
683static
684int get_new_metadata(struct lttng_live_ctx *ctx,
685 struct lttng_live_viewer_stream *viewer_stream,
686 uint64_t *metadata_len)
687{
688 uint64_t len = 0;
689 int ret;
690 struct lttng_viewer_cmd cmd;
691 struct lttng_viewer_get_metadata rq;
692 struct lttng_viewer_metadata_packet rp;
693 struct lttng_live_viewer_stream *metadata_stream;
694 char *data = NULL;
695 ssize_t ret_len;
696
697 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
698 cmd.data_size = sizeof(rq);
699 cmd.cmd_version = 0;
700
701 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
702 rq.stream_id = htobe64(metadata_stream->id);
703
704 do {
705 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
706 } while (ret_len < 0 && errno == EINTR);
707 if (ret_len < 0) {
708 fprintf(stderr, "[error] Error sending cmd\n");
709 ret = ret_len;
710 goto error;
711 }
712 assert(ret_len == sizeof(cmd));
713
714 do {
715 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
716 } while (ret_len < 0 && errno == EINTR);
717 if (ret_len < 0) {
718 fprintf(stderr, "[error] Error sending get_metadata request\n");
719 ret = ret_len;
720 goto error;
721 }
722 assert(ret_len == sizeof(rq));
723
724 do {
725 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
726 } while (ret_len < 0 && errno == EINTR);
727 if (ret_len < 0) {
728 fprintf(stderr, "[error] Error receiving metadata response\n");
729 ret = ret_len;
730 goto error;
731 }
732 assert(ret_len == sizeof(rp));
733
734 switch (be32toh(rp.status)) {
735 case LTTNG_VIEWER_METADATA_OK:
736 printf_verbose("get_metadata : OK\n");
737 break;
738 case LTTNG_VIEWER_NO_NEW_METADATA:
739 printf_verbose("get_metadata : NO NEW\n");
740 ret = -1;
741 goto end;
742 case LTTNG_VIEWER_METADATA_ERR:
743 printf_verbose("get_metadata : ERR\n");
744 ret = -1;
745 goto end;
746 default:
747 printf_verbose("get_metadata : UNKNOWN\n");
748 ret = -1;
749 goto end;
750 }
751
752 len = be64toh(rp.len);
753 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
754 if (len <= 0) {
755 ret = -1;
756 goto end;
757 }
758
759 data = zmalloc(len);
760 if (!data) {
761 perror("relay data zmalloc");
762 ret = -1;
763 goto error;
764 }
765 do {
766 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
767 } while (ret_len < 0 && errno == EINTR);
768 if (ret_len < 0) {
769 fprintf(stderr, "[error] Error receiving trace packet\n");
770 ret = ret_len;
771 free(data);
772 goto error;
773 }
774 assert(ret_len == len);
775
776 do {
777 ret_len = write(metadata_stream->fd, data, len);
778 } while (ret_len < 0 && errno == EINTR);
779 if (ret_len < 0) {
780 free(data);
781 ret = ret_len;
782 goto error;
783 }
784 assert(ret_len == len);
785
786 free(data);
787
788 *metadata_len = len;
789 ret = 0;
790end:
791error:
792 return ret;
793}
794
795/*
796 * Get one index for a stream.
797 *
798 * Returns 0 on success or a negative value on error.
799 */
800static
801int get_next_index(struct lttng_live_ctx *ctx,
802 struct lttng_live_viewer_stream *viewer_stream,
803 struct packet_index *index)
804{
805 struct lttng_viewer_cmd cmd;
806 struct lttng_viewer_get_next_index rq;
807 struct lttng_viewer_index rp;
808 int ret;
809 uint64_t metadata_len;
810 ssize_t ret_len;
811
812 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
813 cmd.data_size = sizeof(rq);
814 cmd.cmd_version = 0;
815
816 memset(&rq, 0, sizeof(rq));
817 rq.stream_id = htobe64(viewer_stream->id);
818
819retry:
820 do {
821 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
822 } while (ret_len < 0 && errno == EINTR);
823 if (ret_len < 0) {
824 fprintf(stderr, "[error] Error sending cmd\n");
825 ret = ret_len;
826 goto error;
827 }
828 assert(ret_len == sizeof(cmd));
829
830 do {
831 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
832 } while (ret_len < 0 && errno == EINTR);
833 if (ret_len < 0) {
834 fprintf(stderr, "[error] Error sending get_next_index request\n");
835 ret = ret_len;
836 goto error;
837 }
838 assert(ret_len == sizeof(rq));
839
840 do {
841 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
842 } while (ret_len < 0 && errno == EINTR);
843 if (ret_len < 0) {
844 fprintf(stderr, "[error] Error receiving index response\n");
845 ret = ret_len;
846 goto error;
847 }
848 assert(ret_len == sizeof(rp));
849
850 rp.flags = be32toh(rp.flags);
851
852 switch (be32toh(rp.status)) {
853 case LTTNG_VIEWER_INDEX_INACTIVE:
854 printf_verbose("get_next_index: inactive\n");
855 memset(index, 0, sizeof(struct packet_index));
992e8cc0 856 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
4a744367
JD
857 break;
858 case LTTNG_VIEWER_INDEX_OK:
859 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
860 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
861 index->offset = be64toh(rp.offset);
862 index->packet_size = be64toh(rp.packet_size);
863 index->content_size = be64toh(rp.content_size);
992e8cc0
MD
864 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
865 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
4a744367
JD
866 index->events_discarded = be64toh(rp.events_discarded);
867
868 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
869 printf_verbose("get_next_index: new metadata needed\n");
870 ret = get_new_metadata(ctx, viewer_stream,
871 &metadata_len);
872 if (ret < 0) {
873 goto error;
874 }
875 }
2acdc547
JD
876 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
877 ret = ask_new_streams(ctx);
878 if (ret < 0)
879 goto error;
880 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
881 ctx->bt_ctx);
882 }
4a744367
JD
883 break;
884 case LTTNG_VIEWER_INDEX_RETRY:
885 printf_verbose("get_next_index: retry\n");
886 sleep(1);
887 goto retry;
888 case LTTNG_VIEWER_INDEX_HUP:
889 printf_verbose("get_next_index: stream hung up\n");
890 viewer_stream->id = -1ULL;
891 viewer_stream->fd = -1;
892 index->offset = EOF;
2acdc547 893 ctx->session->stream_count--;
4a744367
JD
894 break;
895 case LTTNG_VIEWER_INDEX_ERR:
896 fprintf(stderr, "[error] get_next_index: error\n");
897 ret = -1;
898 goto error;
899 default:
900 fprintf(stderr, "[error] get_next_index: unkwown value\n");
901 ret = -1;
902 goto error;
903 }
904
905 ret = 0;
906
907error:
908 return ret;
909}
910
b5a1fa45 911static
4a744367
JD
912void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
913 int whence)
914{
915 struct ctf_stream_pos *pos;
916 struct ctf_file_stream *file_stream;
f1f52630 917 struct packet_index *prev_index = NULL, *cur_index;
4a744367
JD
918 struct lttng_live_viewer_stream *viewer_stream;
919 struct lttng_live_session *session;
920 int ret;
921
922retry:
923 pos = ctf_pos(stream_pos);
924 file_stream = container_of(pos, struct ctf_file_stream, pos);
925 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
926 session = viewer_stream->session;
927
f1f52630
MD
928 switch (pos->packet_index->len) {
929 case 0:
930 g_array_set_size(pos->packet_index, 1);
931 cur_index = &g_array_index(pos->packet_index,
932 struct packet_index, 0);
933 break;
934 case 1:
935 g_array_set_size(pos->packet_index, 2);
936 prev_index = &g_array_index(pos->packet_index,
937 struct packet_index, 0);
938 cur_index = &g_array_index(pos->packet_index,
939 struct packet_index, 1);
940 break;
941 case 2:
942 g_array_index(pos->packet_index,
943 struct packet_index, 0) =
944 g_array_index(pos->packet_index,
945 struct packet_index, 1);
946 prev_index = &g_array_index(pos->packet_index,
947 struct packet_index, 0);
948 cur_index = &g_array_index(pos->packet_index,
949 struct packet_index, 1);
950 break;
951 default:
952 abort();
953 break;
954 }
4a744367 955 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
f1f52630 956 ret = get_next_index(session->ctx, viewer_stream, cur_index);
4a744367
JD
957 if (ret < 0) {
958 pos->offset = EOF;
959 fprintf(stderr, "[error] get_next_index failed\n");
960 return;
961 }
962
f1f52630
MD
963 pos->packet_size = cur_index->packet_size;
964 pos->content_size = cur_index->content_size;
4a744367 965 pos->mmap_base_offset = 0;
f1f52630 966 if (cur_index->offset == EOF) {
4a744367
JD
967 pos->offset = EOF;
968 } else {
969 pos->offset = 0;
970 }
971
f1f52630
MD
972 if (cur_index->content_size == 0) {
973 file_stream->parent.cycles_timestamp =
974 cur_index->ts_cycles.timestamp_end;
4a744367 975 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
f1f52630
MD
976 &file_stream->parent,
977 cur_index->ts_cycles.timestamp_end);
4a744367 978 } else {
f1f52630
MD
979 /* Convert the timestamps and append to the real_index. */
980 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
981 &file_stream->parent,
982 cur_index->ts_cycles.timestamp_begin);
983 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
984 &file_stream->parent,
985 cur_index->ts_cycles.timestamp_end);
986
987 ctf_update_current_packet_index(&file_stream->parent,
988 prev_index, cur_index);
989
990 file_stream->parent.cycles_timestamp =
991 cur_index->ts_cycles.timestamp_begin;
992 file_stream->parent.real_timestamp =
993 cur_index->ts_real.timestamp_begin;
4a744367
JD
994 }
995
996 if (pos->packet_size == 0 || pos->offset == EOF) {
997 goto end;
998 }
999
1000 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1001 viewer_stream->id);
1002 ret = get_data_packet(session->ctx, pos, viewer_stream,
f1f52630
MD
1003 be64toh(cur_index->offset),
1004 cur_index->packet_size / CHAR_BIT);
4a744367
JD
1005 if (ret == -2) {
1006 goto retry;
1007 } else if (ret < 0) {
1008 pos->offset = EOF;
1009 fprintf(stderr, "[error] get_data_packet failed\n");
1010 return;
1011 }
1012
1013 printf_verbose("Index received : packet_size : %" PRIu64
1014 ", offset %" PRIu64 ", content_size %" PRIu64
1015 ", timestamp_end : %" PRIu64 "\n",
f1f52630
MD
1016 cur_index->packet_size, cur_index->offset,
1017 cur_index->content_size,
1018 cur_index->ts_cycles.timestamp_end);
4a744367
JD
1019
1020 /* update trace_packet_header and stream_packet_context */
1021 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1022 /* Read packet header */
1023 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1024 if (ret) {
1025 pos->offset = EOF;
1026 fprintf(stderr, "[error] trace packet header read failed\n");
1027 goto end;
1028 }
1029 }
1030 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1031 /* Read packet context */
1032 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1033 if (ret) {
1034 pos->offset = EOF;
1035 fprintf(stderr, "[error] stream packet context read failed\n");
1036 goto end;
1037 }
1038 }
1039 pos->data_offset = pos->offset;
1040
1041end:
1042 return;
1043}
1044
2acdc547
JD
1045int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1046{
1047 struct lttng_viewer_cmd cmd;
1048 struct lttng_viewer_create_session_response resp;
1049 int ret;
1050 ssize_t ret_len;
1051
1052 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1053 cmd.data_size = 0;
1054 cmd.cmd_version = 0;
1055
1056 do {
1057 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1058 } while (ret_len < 0 && errno == EINTR);
1059 if (ret_len < 0) {
1060 fprintf(stderr, "[error] Error sending cmd\n");
1061 ret = ret_len;
1062 goto error;
1063 }
1064 assert(ret_len == sizeof(cmd));
1065
1066 do {
1067 ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
1068 } while (ret_len < 0 && errno == EINTR);
1069 if (ret_len < 0) {
1070 fprintf(stderr, "[error] Error receiving create session reply\n");
1071 ret = ret_len;
1072 goto error;
1073 }
1074 assert(ret_len == sizeof(resp));
1075
1076 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1077 fprintf(stderr, "[error] Error creating viewer session\n");
1078 ret = -1;
1079 goto error;
1080 }
1081 ret = 0;
1082
1083error:
1084 return ret;
1085}
1086
1087static
1088int del_traces(gpointer key, gpointer value, gpointer user_data)
4a744367
JD
1089{
1090 struct bt_context *bt_ctx = user_data;
1091 struct lttng_live_ctf_trace *trace = value;
1092 int ret;
1093
1094 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1095 if (ret < 0)
1096 fprintf(stderr, "[error] removing trace from context\n");
1097
1098 /* remove the key/value pair from the HT. */
1099 return 1;
1100}
1101
2acdc547
JD
1102static
1103void add_traces(gpointer key, gpointer value, gpointer user_data)
4a744367
JD
1104{
1105 int i, ret, total_metadata = 0;
1106 uint64_t metadata_len;
1107 struct bt_context *bt_ctx = user_data;
1108 struct lttng_live_ctf_trace *trace = value;
1109 struct lttng_live_viewer_stream *stream;
1110 struct bt_mmap_stream *new_mmap_stream;
1111 struct bt_mmap_stream_list mmap_list;
1112 struct lttng_live_ctx *ctx = NULL;
1113
b5a1fa45
JD
1114 /*
1115 * We don't know how many streams we will receive for a trace, so
1116 * once we are done receiving the traces, we add all the traces
1117 * received to the bt_context.
1118 * We can receive streams during the attach command or the
1119 * get_new_streams, so we have to make sure not to add multiple
1120 * times the same traces.
1121 * If a trace is already in the context, we just skip this function.
1122 */
2acdc547
JD
1123 if (trace->in_use)
1124 return;
1125
4a744367
JD
1126 BT_INIT_LIST_HEAD(&mmap_list.head);
1127
1128 for (i = 0; i < trace->streams->len; i++) {
1129 stream = g_ptr_array_index(trace->streams, i);
1130 ctx = stream->session->ctx;
1131
1132 if (!stream->metadata_flag) {
1133 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1134 new_mmap_stream->priv = (void *) stream;
1135 new_mmap_stream->fd = -1;
1136 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1137 } else {
1138 /* Get all possible metadata before starting */
1139 do {
1140 ret = get_new_metadata(ctx, stream,
1141 &metadata_len);
1142 if (ret == 0) {
1143 total_metadata += metadata_len;
1144 }
1145 } while (ret == 0 || total_metadata == 0);
1146 trace->metadata_fp = fopen(stream->path, "r");
1147 }
1148 }
1149
1150 if (!trace->metadata_fp) {
1151 fprintf(stderr, "[error] No metadata stream opened\n");
1152 goto end_free;
1153 }
1154
1155 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1156 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1157 if (ret < 0) {
1158 fprintf(stderr, "[error] Error adding trace\n");
1159 goto end_free;
1160 }
2acdc547
JD
1161
1162 if (bt_ctx->current_iterator) {
1163 struct bt_trace_descriptor *td;
1164 struct bt_trace_handle *handle;
1165
1166 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1167 bt_ctx->trace_handles,
1168 (gpointer) (unsigned long) ret);
1169 td = handle->td;
2acdc547
JD
1170 bt_iter_add_trace(bt_ctx->current_iterator, td);
1171 }
1172
4a744367 1173 trace->trace_id = ret;
2acdc547 1174 trace->in_use = 1;
4a744367
JD
1175
1176 goto end;
1177
1178end_free:
1179 bt_context_put(bt_ctx);
1180end:
1181 return;
1182}
1183
2acdc547 1184int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
6192b988
JD
1185{
1186 struct lttng_viewer_cmd cmd;
2acdc547
JD
1187 struct lttng_viewer_new_streams_request rq;
1188 struct lttng_viewer_new_streams_response rp;
1189 struct lttng_viewer_stream stream;
1190 int ret, i;
6192b988 1191 ssize_t ret_len;
b5a1fa45 1192 uint32_t stream_count;
6192b988 1193
2acdc547
JD
1194 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1195 cmd.data_size = sizeof(rq);
6192b988
JD
1196 cmd.cmd_version = 0;
1197
2acdc547
JD
1198 memset(&rq, 0, sizeof(rq));
1199 rq.session_id = htobe64(id);
1200
6192b988
JD
1201 do {
1202 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1203 } while (ret_len < 0 && errno == EINTR);
1204 if (ret_len < 0) {
1205 fprintf(stderr, "[error] Error sending cmd\n");
1206 ret = ret_len;
1207 goto error;
1208 }
1209 assert(ret_len == sizeof(cmd));
1210
1211 do {
2acdc547 1212 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
6192b988
JD
1213 } while (ret_len < 0 && errno == EINTR);
1214 if (ret_len < 0) {
b5a1fa45 1215 fprintf(stderr, "[error] Error sending get_new_streams request\n");
6192b988
JD
1216 ret = ret_len;
1217 goto error;
1218 }
2acdc547 1219 assert(ret_len == sizeof(rq));
6192b988 1220
2acdc547
JD
1221 do {
1222 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
1223 } while (ret_len < 0 && errno == EINTR);
1224 if (ret_len < 0) {
1225 fprintf(stderr, "[error] Error receiving get_new_streams response\n");
1226 ret = ret_len;
6192b988
JD
1227 goto error;
1228 }
2acdc547
JD
1229 assert(ret_len == sizeof(rp));
1230
1231 switch(be32toh(rp.status)) {
1232 case LTTNG_VIEWER_NEW_STREAMS_OK:
1233 break;
1234 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1235 ret = 0;
1236 goto end;
1237 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1238 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1239 goto end;
1240 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1241 fprintf(stderr, "[error] get_new_streams error\n");
1242 ret = -1;
1243 goto end;
1244 default:
1245 fprintf(stderr, "[error] Unknown return code %u\n",
1246 be32toh(rp.status));
1247 ret = -1;
1248 goto end;
1249 }
1250
b5a1fa45
JD
1251 stream_count = be32toh(rp.streams_count);
1252 ctx->session->stream_count += stream_count;
2acdc547
JD
1253 /*
1254 * When the session is created but not started, we do an active wait
1255 * until it starts. It allows the viewer to start processing the trace
1256 * as soon as the session starts.
1257 */
1258 if (ctx->session->stream_count == 0) {
1259 ret = 0;
1260 goto end;
1261 }
1262 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1263 ctx->session->stream_count);
1264 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1265 ctx->session->stream_count);
b5a1fa45 1266 for (i = 0; i < stream_count; i++) {
2acdc547
JD
1267 do {
1268 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
1269 } while (ret_len < 0 && errno == EINTR);
1270 if (ret_len < 0) {
1271 fprintf(stderr, "[error] Error receiving stream\n");
1272 ret = ret_len;
1273 goto error;
1274 }
1275 assert(ret_len == sizeof(stream));
1276 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1277 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1278
1279 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1280 be64toh(stream.id), stream.path_name,
1281 stream.channel_name);
1282 ctx->session->streams[i].id = be64toh(stream.id);
1283 ctx->session->streams[i].session = ctx->session;
1284
1285 ctx->session->streams[i].first_read = 1;
1286 ctx->session->streams[i].mmap_size = 0;
1287
1288 if (be32toh(stream.metadata_flag)) {
1289 char *path;
1290
1291 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
1292 if (!path) {
1293 perror("strdup");
1294 ret = -1;
1295 goto error;
1296 }
1297 if (!mkdtemp(path)) {
1298 perror("mkdtemp");
1299 free(path);
1300 ret = -1;
1301 goto error;
1302 }
1303 ctx->session->streams[i].metadata_flag = 1;
1304 snprintf(ctx->session->streams[i].path,
1305 sizeof(ctx->session->streams[i].path),
1306 "%s/%s", path,
1307 stream.channel_name);
1308 ret = open(ctx->session->streams[i].path,
1309 O_WRONLY | O_CREAT | O_TRUNC,
1310 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1311 if (ret < 0) {
1312 perror("open");
1313 free(path);
1314 goto error;
1315 }
1316 ctx->session->streams[i].fd = ret;
1317 free(path);
1318 }
1319 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1320 be64toh(stream.ctf_trace_id));
1321 if (ret < 0) {
1322 goto error;
1323 }
1324
1325 }
6192b988
JD
1326 ret = 0;
1327
2acdc547 1328end:
6192b988
JD
1329error:
1330 return ret;
1331}
1332
2acdc547 1333void lttng_live_read(struct lttng_live_ctx *ctx)
4a744367 1334{
2acdc547 1335 int ret, i;
4a744367
JD
1336 struct bt_ctf_iter *iter;
1337 const struct bt_ctf_event *event;
1338 struct bt_iter_pos begin_pos;
1339 struct bt_trace_descriptor *td_write;
1340 struct bt_format *fmt_write;
1341 struct ctf_text_stream_pos *sout;
2acdc547 1342 uint64_t id;
4a744367 1343
2acdc547
JD
1344 ctx->bt_ctx = bt_context_create();
1345 if (!ctx->bt_ctx) {
4a744367
JD
1346 fprintf(stderr, "[error] bt_context_create allocation\n");
1347 goto end;
1348 }
1349
1350 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1351 if (!fmt_write) {
1352 fprintf(stderr, "[error] ctf-text error\n");
1353 goto end;
1354 }
1355
1356 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1357 if (!td_write) {
1358 fprintf(stderr, "[error] Error opening output trace\n");
1359 goto end_free;
1360 }
1361
1362 sout = container_of(td_write, struct ctf_text_stream_pos,
1363 trace_descriptor);
1364 if (!sout->parent.event_cb)
1365 goto end_free;
1366
6192b988
JD
1367 ret = lttng_live_create_viewer_session(ctx);
1368 if (ret < 0) {
1369 goto end_free;
1370 }
1371
2acdc547
JD
1372 for (i = 0; i < ctx->session_ids->len; i++) {
1373 id = g_array_index(ctx->session_ids, uint64_t, i);
1374 printf_verbose("Attaching to session %lu\n", id);
1375 ret = lttng_live_attach_session(ctx, id);
1376 printf_verbose("Attaching session returns %d\n", ret);
1377 if (ret < 0) {
1378 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1379 fprintf(stderr, "[error] Unknown session ID\n");
1380 }
1381 goto end_free;
1382 }
1383 }
1384
4a744367 1385 /*
2acdc547 1386 * As long as the session is active, we try to get new streams.
4a744367 1387 */
2acdc547 1388 for (;;) {
4a744367
JD
1389 int flags;
1390
2acdc547
JD
1391 while (!ctx->session->stream_count) {
1392 if (ctx->session_ids->len == 0)
4a744367 1393 goto end_free;
2acdc547
JD
1394 ret = ask_new_streams(ctx);
1395 if (ret < 0)
1396 goto end_free;
1397 }
4a744367 1398
2acdc547
JD
1399 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1400 ctx->bt_ctx);
4a744367
JD
1401
1402 begin_pos.type = BT_SEEK_BEGIN;
2acdc547 1403 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
4a744367
JD
1404 if (!iter) {
1405 fprintf(stderr, "[error] Iterator creation error\n");
1406 goto end;
1407 }
1408 for (;;) {
1409 event = bt_ctf_iter_read_event_flags(iter, &flags);
1410 if (!(flags & BT_ITER_FLAG_RETRY)) {
1411 if (!event) {
1412 /* End of trace */
1413 break;
1414 }
1415 ret = sout->parent.event_cb(&sout->parent,
1416 event->parent->stream);
1417 if (ret) {
1418 fprintf(stderr, "[error] Writing "
1419 "event failed.\n");
1420 goto end_free;
1421 }
1422 }
1423 ret = bt_iter_next(bt_ctf_get_iter(iter));
1424 if (ret < 0) {
1425 goto end_free;
1426 }
1427 }
1428 bt_ctf_iter_destroy(iter);
2acdc547
JD
1429 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1430 del_traces, ctx->bt_ctx);
1431 ctx->session->stream_count = 0;
1432 }
4a744367
JD
1433
1434end_free:
2acdc547 1435 bt_context_put(ctx->bt_ctx);
4a744367
JD
1436end:
1437 return;
1438}
This page took 0.079323 seconds and 4 git commands to generate.