Fix: lttng-live: handle orderly shutdown
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
CommitLineData
4a744367
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <netinet/in.h>
27#include <netdb.h>
28#include <stdio.h>
29#include <string.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <errno.h>
33#include <inttypes.h>
34#include <fcntl.h>
35#include <sys/mman.h>
36
37#include <babeltrace/ctf/ctf-index.h>
38
39#include <babeltrace/babeltrace.h>
40#include <babeltrace/ctf/events.h>
41#include <babeltrace/ctf/callbacks.h>
42#include <babeltrace/ctf/iterator.h>
43
44/* for packet_index */
45#include <babeltrace/ctf/types.h>
46
47#include <babeltrace/ctf/metadata.h>
48#include <babeltrace/ctf-text/types.h>
49#include <babeltrace/ctf/events-internal.h>
50#include <formats/ctf/events-private.h>
51
c98627ca 52#include "lttng-live.h"
158eea6e 53#include "lttng-viewer-abi.h"
4a744367
JD
54
55/*
56 * Memory allocation zeroed
57 */
58#define zmalloc(x) calloc(1, x)
59
60#ifndef max_t
61#define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63#endif
64
b5a1fa45
JD
65static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
66 size_t index, int whence);
2acdc547
JD
67static void add_traces(gpointer key, gpointer value, gpointer user_data);
68static int del_traces(gpointer key, gpointer value, gpointer user_data);
69
b5a1fa45 70int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
4a744367
JD
71{
72 struct hostent *host;
73 struct sockaddr_in server_addr;
74 int ret;
75
b5a1fa45 76 host = gethostbyname(ctx->relay_hostname);
4a744367
JD
77 if (!host) {
78 ret = -1;
79 goto end;
80 }
81
82 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
83 perror("Socket");
84 ret = -1;
85 goto end;
86 }
87
88 server_addr.sin_family = AF_INET;
b5a1fa45 89 server_addr.sin_port = htons(ctx->port);
4a744367
JD
90 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
91 bzero(&(server_addr.sin_zero), 8);
92
93 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
94 sizeof(struct sockaddr)) == -1) {
95 perror("Connect");
96 ret = -1;
97 goto end;
98 }
99
100 ret = 0;
101
102end:
103 return ret;
104}
105
106int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
107{
108 struct lttng_viewer_cmd cmd;
109 struct lttng_viewer_connect connect;
110 int ret;
111 ssize_t ret_len;
112
113 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
114 cmd.data_size = sizeof(connect);
115 cmd.cmd_version = 0;
116
0c3cd7e1 117 connect.viewer_session_id = -1ULL; /* will be set on recv */
4a744367
JD
118 connect.major = htobe32(LTTNG_LIVE_MAJOR);
119 connect.minor = htobe32(LTTNG_LIVE_MINOR);
120 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
121
122 do {
123 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
124 } while (ret_len < 0 && errno == EINTR);
125 if (ret_len < 0) {
126 fprintf(stderr, "[error] Error sending cmd\n");
127 ret = ret_len;
128 goto error;
129 }
130 assert(ret_len == sizeof(cmd));
131
132 do {
133 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
134 } while (ret_len < 0 && errno == EINTR);
135 if (ret_len < 0) {
136 fprintf(stderr, "[error] Error sending version\n");
137 ret = ret_len;
138 goto error;
139 }
140 assert(ret_len == sizeof(connect));
141
142 do {
143 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
144 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
145 if (ret_len == 0) {
146 fprintf(stderr, "[error] Remote side has closed connection\n");
147 ret = -1;
148 goto error;
149 }
4a744367
JD
150 if (ret_len < 0) {
151 fprintf(stderr, "[error] Error receiving version\n");
152 ret = ret_len;
153 goto error;
154 }
155 assert(ret_len == sizeof(connect));
156
157 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
158 be64toh(connect.viewer_session_id));
159 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
160 be32toh(connect.minor));
161
162 ret = 0;
163
164error:
165 return ret;
166}
167
b5a1fa45
JD
168static
169void free_session_list(GPtrArray *session_list)
170{
171 int i;
172 struct lttng_live_relay_session *relay_session;
173
174 for (i = 0; i < session_list->len; i++) {
175 relay_session = g_ptr_array_index(session_list, i);
176 free(relay_session->name);
177 free(relay_session->hostname);
178 }
179 g_ptr_array_free(session_list, TRUE);
180}
181
182static
183void print_session_list(GPtrArray *session_list, const char *path)
184{
185 int i;
186 struct lttng_live_relay_session *relay_session;
187
188 for (i = 0; i < session_list->len; i++) {
189 relay_session = g_ptr_array_index(session_list, i);
190 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
191 "%u stream(s), %u client(s) connected)\n",
192 path, relay_session->hostname,
193 relay_session->name, relay_session->timer,
194 relay_session->streams, relay_session->clients);
195 }
196}
197
198static
199void update_session_list(GPtrArray *session_list, char *hostname,
200 char *session_name, uint32_t streams, uint32_t clients,
201 uint32_t timer)
202{
203 int i, found = 0;
204 struct lttng_live_relay_session *relay_session;
205
206 for (i = 0; i < session_list->len; i++) {
207 relay_session = g_ptr_array_index(session_list, i);
208 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
209 strncmp(relay_session->name,
210 session_name, NAME_MAX) == 0) {
211 relay_session->streams += streams;
212 if (relay_session->clients < clients)
213 relay_session->clients = clients;
214 found = 1;
215 break;
216 }
217 }
218 if (found)
219 return;
220
221 relay_session = g_new0(struct lttng_live_relay_session, 1);
222 relay_session->hostname = strndup(hostname, NAME_MAX);
223 relay_session->name = strndup(session_name, NAME_MAX);
224 relay_session->clients = clients;
225 relay_session->streams = streams;
226 relay_session->timer = timer;
227 g_ptr_array_add(session_list, relay_session);
228}
229
4a744367
JD
230int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
231{
232 struct lttng_viewer_cmd cmd;
233 struct lttng_viewer_list_sessions list;
234 struct lttng_viewer_session lsession;
b5a1fa45 235 int i, ret, sessions_count, print_list = 0;
4a744367 236 ssize_t ret_len;
b5a1fa45
JD
237 uint64_t session_id;
238 GPtrArray *session_list = NULL;
239
240 if (strlen(ctx->session_name) == 0) {
241 print_list = 1;
242 session_list = g_ptr_array_new();
243 }
4a744367
JD
244
245 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
246 cmd.data_size = 0;
247 cmd.cmd_version = 0;
248
249 do {
250 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
251 } while (ret_len < 0 && errno == EINTR);
252 if (ret_len < 0) {
253 fprintf(stderr, "[error] Error sending cmd\n");
254 ret = ret_len;
255 goto error;
256 }
257 assert(ret_len == sizeof(cmd));
258
259 do {
260 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
261 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
262 if (ret_len == 0) {
263 fprintf(stderr, "[error] Remote side has closed connection\n");
264 ret = -1;
265 goto error;
266 }
4a744367
JD
267 if (ret_len < 0) {
268 fprintf(stderr, "[error] Error receiving session list\n");
269 ret = ret_len;
270 goto error;
271 }
272 assert(ret_len == sizeof(list));
273
274 sessions_count = be32toh(list.sessions_count);
4a744367
JD
275 for (i = 0; i < sessions_count; i++) {
276 do {
277 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
278 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
279 if (ret_len == 0) {
280 fprintf(stderr, "[error] Remote side has closed connection\n");
281 ret = -1;
282 goto error;
283 }
4a744367
JD
284 if (ret_len < 0) {
285 fprintf(stderr, "[error] Error receiving session\n");
286 ret = ret_len;
287 goto error;
288 }
289 assert(ret_len == sizeof(lsession));
9ed31e53
MD
290 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
291 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
b5a1fa45
JD
292 session_id = be64toh(lsession.id);
293
294 if (print_list) {
295 update_session_list(session_list,
296 lsession.hostname,
297 lsession.session_name,
298 be32toh(lsession.streams),
299 be32toh(lsession.clients),
300 be32toh(lsession.live_timer));
301 } else {
302 if ((strncmp(lsession.session_name, ctx->session_name,
303 NAME_MAX) == 0) && (strncmp(lsession.hostname,
304 ctx->traced_hostname, NAME_MAX) == 0)) {
305 printf_verbose("Reading from session %" PRIu64 "\n",
306 session_id);
307 g_array_append_val(ctx->session_ids,
308 session_id);
309 }
310 }
311 }
4a744367 312
b5a1fa45
JD
313 if (print_list) {
314 print_session_list(session_list, path);
315 free_session_list(session_list);
4a744367
JD
316 }
317
318 ret = 0;
319
320error:
321 return ret;
322}
323
324int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
325 uint64_t ctf_trace_id)
326{
327 struct lttng_live_ctf_trace *trace;
328 int ret = 0;
329
330 trace = g_hash_table_lookup(stream->session->ctf_traces,
331 (gpointer) ctf_trace_id);
332 if (!trace) {
333 trace = g_new0(struct lttng_live_ctf_trace, 1);
4a744367
JD
334 trace->ctf_trace_id = ctf_trace_id;
335 trace->streams = g_ptr_array_new();
336 g_hash_table_insert(stream->session->ctf_traces,
337 (gpointer) ctf_trace_id,
338 trace);
339 }
340 if (stream->metadata_flag)
341 trace->metadata_stream = stream;
342
343 stream->ctf_trace = trace;
344 g_ptr_array_add(trace->streams, stream);
345
4a744367
JD
346 return ret;
347}
348
349int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
350{
351 struct lttng_viewer_cmd cmd;
352 struct lttng_viewer_attach_session_request rq;
353 struct lttng_viewer_attach_session_response rp;
354 struct lttng_viewer_stream stream;
355 int ret, i;
356 ssize_t ret_len;
357
358 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
359 cmd.data_size = sizeof(rq);
360 cmd.cmd_version = 0;
361
362 memset(&rq, 0, sizeof(rq));
363 rq.session_id = htobe64(id);
364 // TODO: add cmd line parameter to select seek beginning
365 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
366 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
367
368 do {
369 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
370 } while (ret_len < 0 && errno == EINTR);
371 if (ret_len < 0) {
372 fprintf(stderr, "[error] Error sending cmd\n");
373 ret = ret_len;
374 goto error;
375 }
376 assert(ret_len == sizeof(cmd));
377
378 do {
379 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
380 } while (ret_len < 0 && errno == EINTR);
381 if (ret_len < 0) {
382 fprintf(stderr, "[error] Error sending attach request\n");
383 ret = ret_len;
384 goto error;
385 }
386 assert(ret_len == sizeof(rq));
387
388 do {
389 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
390 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
391 if (ret_len == 0) {
392 fprintf(stderr, "[error] Remote side has closed connection\n");
393 ret = -1;
394 goto error;
395 }
4a744367
JD
396 if (ret_len < 0) {
397 fprintf(stderr, "[error] Error receiving attach response\n");
398 ret = ret_len;
399 goto error;
400 }
401 assert(ret_len == sizeof(rp));
402
403 switch(be32toh(rp.status)) {
404 case LTTNG_VIEWER_ATTACH_OK:
405 break;
406 case LTTNG_VIEWER_ATTACH_UNK:
407 ret = -LTTNG_VIEWER_ATTACH_UNK;
408 goto end;
409 case LTTNG_VIEWER_ATTACH_ALREADY:
410 fprintf(stderr, "[error] Already a viewer attached\n");
411 ret = -1;
412 goto end;
413 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
414 fprintf(stderr, "[error] Not a live session\n");
415 ret = -1;
416 goto end;
417 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
418 fprintf(stderr, "[error] Wrong seek parameter\n");
419 ret = -1;
420 goto end;
421 default:
422 fprintf(stderr, "[error] Unknown attach return code %u\n",
423 be32toh(rp.status));
424 ret = -1;
425 goto end;
426 }
427 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
428 ret = -1;
429 goto end;
430 }
431
2acdc547 432 ctx->session->stream_count += be32toh(rp.streams_count);
4a744367
JD
433 /*
434 * When the session is created but not started, we do an active wait
435 * until it starts. It allows the viewer to start processing the trace
436 * as soon as the session starts.
437 */
438 if (ctx->session->stream_count == 0) {
439 ret = 0;
440 goto end;
441 }
442 printf_verbose("Waiting for %" PRIu64 " streams:\n",
443 ctx->session->stream_count);
444 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
445 ctx->session->stream_count);
4a744367
JD
446 for (i = 0; i < be32toh(rp.streams_count); i++) {
447 do {
448 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
449 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
450 if (ret_len == 0) {
451 fprintf(stderr, "[error] Remote side has closed connection\n");
452 ret = -1;
453 goto error;
454 }
4a744367
JD
455 if (ret_len < 0) {
456 fprintf(stderr, "[error] Error receiving stream\n");
457 ret = ret_len;
458 goto error;
459 }
460 assert(ret_len == sizeof(stream));
9ed31e53
MD
461 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
462 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
4a744367
JD
463
464 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
465 be64toh(stream.id), stream.path_name,
466 stream.channel_name);
467 ctx->session->streams[i].id = be64toh(stream.id);
468 ctx->session->streams[i].session = ctx->session;
469
470 ctx->session->streams[i].first_read = 1;
471 ctx->session->streams[i].mmap_size = 0;
472
473 if (be32toh(stream.metadata_flag)) {
474 char *path;
475
476 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
6a4d2b5a
MD
477 if (!path) {
478 perror("strdup");
479 ret = -1;
480 goto error;
481 }
482 if (!mkdtemp(path)) {
483 perror("mkdtemp");
484 free(path);
485 ret = -1;
486 goto error;
487 }
4a744367 488 ctx->session->streams[i].metadata_flag = 1;
4a744367
JD
489 snprintf(ctx->session->streams[i].path,
490 sizeof(ctx->session->streams[i].path),
491 "%s/%s", path,
492 stream.channel_name);
493 ret = open(ctx->session->streams[i].path,
494 O_WRONLY | O_CREAT | O_TRUNC,
495 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
496 if (ret < 0) {
ab9d07f0 497 perror("open");
6a4d2b5a 498 free(path);
4a744367
JD
499 goto error;
500 }
501 ctx->session->streams[i].fd = ret;
6a4d2b5a 502 free(path);
4a744367
JD
503 }
504 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
505 be64toh(stream.ctf_trace_id));
506 if (ret < 0) {
507 goto error;
508 }
509
510 }
511 ret = 0;
512
513end:
514error:
515 return ret;
516}
517
2acdc547
JD
518static
519int ask_new_streams(struct lttng_live_ctx *ctx)
520{
521 int i, ret = 0;
522 uint64_t id;
523
524restart:
525 for (i = 0; i < ctx->session_ids->len; i++) {
526 id = g_array_index(ctx->session_ids, uint64_t, i);
527 ret = lttng_live_get_new_streams(ctx, id);
528 printf_verbose("Asking for new streams returns %d\n",
529 ret);
530 if (ret < 0) {
531 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
532 printf_verbose("Session %" PRIu64 " closed\n",
533 id);
b5a1fa45
JD
534 /*
535 * The streams have already been closed during
536 * the reading, so we only need to get rid of
537 * the trace in our internal table of sessions.
538 */
2acdc547
JD
539 g_array_remove_index(ctx->session_ids, i);
540 /*
541 * We can't continue iterating on the g_array
542 * after a remove, we have to start again.
543 */
544 goto restart;
545 } else {
546 ret = -1;
547 goto end;
548 }
549 }
550 }
551
552end:
553 return ret;
554}
555
4a744367
JD
556static
557int get_data_packet(struct lttng_live_ctx *ctx,
558 struct ctf_stream_pos *pos,
559 struct lttng_live_viewer_stream *stream, uint64_t offset,
560 uint64_t len)
561{
562 struct lttng_viewer_cmd cmd;
563 struct lttng_viewer_get_packet rq;
564 struct lttng_viewer_trace_packet rp;
565 ssize_t ret_len;
566 int ret;
567
568 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
569 cmd.data_size = sizeof(rq);
570 cmd.cmd_version = 0;
571
572 memset(&rq, 0, sizeof(rq));
573 rq.stream_id = htobe64(stream->id);
574 /* Already in big endian. */
575 rq.offset = offset;
576 rq.len = htobe32(len);
577
578 do {
579 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
580 } while (ret_len < 0 && errno == EINTR);
581 if (ret_len < 0) {
582 fprintf(stderr, "[error] Error sending cmd\n");
583 ret = ret_len;
584 goto error;
585 }
586 assert(ret_len == sizeof(cmd));
587
588 do {
589 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
590 } while (ret_len < 0 && errno == EINTR);
591 if (ret_len < 0) {
592 fprintf(stderr, "[error] Error sending get_data_packet request\n");
593 ret = ret_len;
594 goto error;
595 }
596 assert(ret_len == sizeof(rq));
597
598 do {
599 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
600 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
601 if (ret_len == 0) {
602 fprintf(stderr, "[error] Remote side has closed connection\n");
603 ret = -1;
604 goto error;
605 }
4a744367
JD
606 if (ret_len < 0) {
607 fprintf(stderr, "[error] Error receiving data response\n");
608 ret = ret_len;
609 goto error;
610 }
57b9d843
JD
611 if (ret_len != sizeof(rp)) {
612 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
613 ", received %" PRId64 "\n", ret_len,
614 sizeof(rp));
615 ret = -1;
616 goto error;
617 }
4a744367
JD
618
619 rp.flags = be32toh(rp.flags);
620
621 switch (be32toh(rp.status)) {
622 case LTTNG_VIEWER_GET_PACKET_OK:
623 len = be32toh(rp.len);
624 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
625 "\n", len);
626 break;
627 case LTTNG_VIEWER_GET_PACKET_RETRY:
628 printf_verbose("get_data_packet: retry\n");
629 ret = -1;
630 goto end;
631 case LTTNG_VIEWER_GET_PACKET_ERR:
632 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
633 printf_verbose("get_data_packet: new metadata needed\n");
634 ret = 0;
635 goto end;
636 }
2acdc547
JD
637 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
638 ret = ask_new_streams(ctx);
639 if (ret < 0)
640 goto error;
641 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
642 ctx->bt_ctx);
643 }
4a744367
JD
644 fprintf(stderr, "[error] get_data_packet: error\n");
645 ret = -1;
646 goto end;
647 case LTTNG_VIEWER_GET_PACKET_EOF:
648 ret = -2;
649 goto error;
650 default:
651 printf_verbose("get_data_packet: unknown\n");
652 assert(0);
653 ret = -1;
654 goto end;
655 }
656
657 if (len <= 0) {
658 ret = -1;
659 goto end;
660 }
661
662 if (len > stream->mmap_size) {
663 uint64_t new_size;
664
665 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
666 if (pos->base_mma) {
667 /* unmap old base */
668 ret = munmap_align(pos->base_mma);
669 if (ret) {
670 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
671 strerror(errno));
672 ret = -1;
673 goto error;
674 }
675 pos->base_mma = NULL;
676 }
677 pos->base_mma = mmap_align(new_size,
678 PROT_READ | PROT_WRITE,
679 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
680 if (pos->base_mma == MAP_FAILED) {
681 fprintf(stderr, "[error] mmap error %s.\n",
682 strerror(errno));
683 pos->base_mma = NULL;
684 ret = -1;
685 goto error;
686 }
687
688 stream->mmap_size = new_size;
689 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
690 stream->mmap_size);
691 }
692
693 do {
694 ret_len = recv(ctx->control_sock,
695 mmap_align_addr(pos->base_mma), len,
696 MSG_WAITALL);
697 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
698 if (ret_len == 0) {
699 fprintf(stderr, "[error] Remote side has closed connection\n");
700 ret = -1;
701 goto error;
702 }
4a744367
JD
703 if (ret_len < 0) {
704 fprintf(stderr, "[error] Error receiving trace packet\n");
705 ret = ret_len;
706 goto error;
707 }
708 assert(ret_len == len);
709
710end:
711error:
712 return ret;
713}
714
715/*
716 * Return number of metadata bytes written or a negative value on error.
717 */
718static
719int get_new_metadata(struct lttng_live_ctx *ctx,
720 struct lttng_live_viewer_stream *viewer_stream,
721 uint64_t *metadata_len)
722{
723 uint64_t len = 0;
724 int ret;
725 struct lttng_viewer_cmd cmd;
726 struct lttng_viewer_get_metadata rq;
727 struct lttng_viewer_metadata_packet rp;
728 struct lttng_live_viewer_stream *metadata_stream;
729 char *data = NULL;
730 ssize_t ret_len;
731
732 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
733 cmd.data_size = sizeof(rq);
734 cmd.cmd_version = 0;
735
736 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
737 rq.stream_id = htobe64(metadata_stream->id);
738
739 do {
740 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
741 } while (ret_len < 0 && errno == EINTR);
742 if (ret_len < 0) {
743 fprintf(stderr, "[error] Error sending cmd\n");
744 ret = ret_len;
745 goto error;
746 }
747 assert(ret_len == sizeof(cmd));
748
749 do {
750 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
751 } while (ret_len < 0 && errno == EINTR);
752 if (ret_len < 0) {
753 fprintf(stderr, "[error] Error sending get_metadata request\n");
754 ret = ret_len;
755 goto error;
756 }
757 assert(ret_len == sizeof(rq));
758
759 do {
760 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
761 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
762 if (ret_len == 0) {
763 fprintf(stderr, "[error] Remote side has closed connection\n");
764 ret = -1;
765 goto error;
766 }
4a744367
JD
767 if (ret_len < 0) {
768 fprintf(stderr, "[error] Error receiving metadata response\n");
769 ret = ret_len;
770 goto error;
771 }
772 assert(ret_len == sizeof(rp));
773
774 switch (be32toh(rp.status)) {
775 case LTTNG_VIEWER_METADATA_OK:
776 printf_verbose("get_metadata : OK\n");
777 break;
778 case LTTNG_VIEWER_NO_NEW_METADATA:
779 printf_verbose("get_metadata : NO NEW\n");
780 ret = -1;
781 goto end;
782 case LTTNG_VIEWER_METADATA_ERR:
783 printf_verbose("get_metadata : ERR\n");
784 ret = -1;
785 goto end;
786 default:
787 printf_verbose("get_metadata : UNKNOWN\n");
788 ret = -1;
789 goto end;
790 }
791
792 len = be64toh(rp.len);
793 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
794 if (len <= 0) {
795 ret = -1;
796 goto end;
797 }
798
799 data = zmalloc(len);
800 if (!data) {
801 perror("relay data zmalloc");
802 ret = -1;
803 goto error;
804 }
805 do {
806 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
807 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
808 if (ret_len == 0) {
809 fprintf(stderr, "[error] Remote side has closed connection\n");
810 ret = -1;
811 free(data);
812 goto error;
813 }
4a744367
JD
814 if (ret_len < 0) {
815 fprintf(stderr, "[error] Error receiving trace packet\n");
816 ret = ret_len;
817 free(data);
818 goto error;
819 }
820 assert(ret_len == len);
821
822 do {
823 ret_len = write(metadata_stream->fd, data, len);
824 } while (ret_len < 0 && errno == EINTR);
825 if (ret_len < 0) {
826 free(data);
827 ret = ret_len;
828 goto error;
829 }
830 assert(ret_len == len);
831
832 free(data);
833
834 *metadata_len = len;
835 ret = 0;
836end:
837error:
838 return ret;
839}
840
841/*
842 * Get one index for a stream.
843 *
844 * Returns 0 on success or a negative value on error.
845 */
846static
847int get_next_index(struct lttng_live_ctx *ctx,
848 struct lttng_live_viewer_stream *viewer_stream,
849 struct packet_index *index)
850{
851 struct lttng_viewer_cmd cmd;
852 struct lttng_viewer_get_next_index rq;
853 struct lttng_viewer_index rp;
854 int ret;
855 uint64_t metadata_len;
856 ssize_t ret_len;
857
858 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
859 cmd.data_size = sizeof(rq);
860 cmd.cmd_version = 0;
861
862 memset(&rq, 0, sizeof(rq));
863 rq.stream_id = htobe64(viewer_stream->id);
864
865retry:
866 do {
867 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
868 } while (ret_len < 0 && errno == EINTR);
869 if (ret_len < 0) {
870 fprintf(stderr, "[error] Error sending cmd\n");
871 ret = ret_len;
872 goto error;
873 }
874 assert(ret_len == sizeof(cmd));
875
876 do {
877 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
878 } while (ret_len < 0 && errno == EINTR);
879 if (ret_len < 0) {
880 fprintf(stderr, "[error] Error sending get_next_index request\n");
881 ret = ret_len;
882 goto error;
883 }
884 assert(ret_len == sizeof(rq));
885
886 do {
887 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
888 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
889 if (ret_len == 0) {
890 fprintf(stderr, "[error] Remote side has closed connection\n");
891 ret = -1;
892 goto error;
893 }
4a744367
JD
894 if (ret_len < 0) {
895 fprintf(stderr, "[error] Error receiving index response\n");
896 ret = ret_len;
897 goto error;
898 }
899 assert(ret_len == sizeof(rp));
900
901 rp.flags = be32toh(rp.flags);
902
903 switch (be32toh(rp.status)) {
904 case LTTNG_VIEWER_INDEX_INACTIVE:
905 printf_verbose("get_next_index: inactive\n");
906 memset(index, 0, sizeof(struct packet_index));
992e8cc0 907 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
4a744367
JD
908 break;
909 case LTTNG_VIEWER_INDEX_OK:
910 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
911 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
912 index->offset = be64toh(rp.offset);
913 index->packet_size = be64toh(rp.packet_size);
914 index->content_size = be64toh(rp.content_size);
992e8cc0
MD
915 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
916 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
4a744367
JD
917 index->events_discarded = be64toh(rp.events_discarded);
918
919 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
920 printf_verbose("get_next_index: new metadata needed\n");
921 ret = get_new_metadata(ctx, viewer_stream,
922 &metadata_len);
923 if (ret < 0) {
924 goto error;
925 }
926 }
2acdc547
JD
927 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
928 ret = ask_new_streams(ctx);
929 if (ret < 0)
930 goto error;
931 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
932 ctx->bt_ctx);
933 }
4a744367
JD
934 break;
935 case LTTNG_VIEWER_INDEX_RETRY:
936 printf_verbose("get_next_index: retry\n");
937 sleep(1);
938 goto retry;
939 case LTTNG_VIEWER_INDEX_HUP:
940 printf_verbose("get_next_index: stream hung up\n");
941 viewer_stream->id = -1ULL;
942 viewer_stream->fd = -1;
943 index->offset = EOF;
2acdc547 944 ctx->session->stream_count--;
4a744367
JD
945 break;
946 case LTTNG_VIEWER_INDEX_ERR:
947 fprintf(stderr, "[error] get_next_index: error\n");
948 ret = -1;
949 goto error;
950 default:
951 fprintf(stderr, "[error] get_next_index: unkwown value\n");
952 ret = -1;
953 goto error;
954 }
955
956 ret = 0;
957
958error:
959 return ret;
960}
961
b5a1fa45 962static
4a744367
JD
963void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
964 int whence)
965{
966 struct ctf_stream_pos *pos;
967 struct ctf_file_stream *file_stream;
f1f52630 968 struct packet_index *prev_index = NULL, *cur_index;
4a744367
JD
969 struct lttng_live_viewer_stream *viewer_stream;
970 struct lttng_live_session *session;
971 int ret;
972
973retry:
974 pos = ctf_pos(stream_pos);
975 file_stream = container_of(pos, struct ctf_file_stream, pos);
976 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
977 session = viewer_stream->session;
978
f1f52630
MD
979 switch (pos->packet_index->len) {
980 case 0:
981 g_array_set_size(pos->packet_index, 1);
982 cur_index = &g_array_index(pos->packet_index,
983 struct packet_index, 0);
984 break;
985 case 1:
986 g_array_set_size(pos->packet_index, 2);
987 prev_index = &g_array_index(pos->packet_index,
988 struct packet_index, 0);
989 cur_index = &g_array_index(pos->packet_index,
990 struct packet_index, 1);
991 break;
992 case 2:
993 g_array_index(pos->packet_index,
994 struct packet_index, 0) =
995 g_array_index(pos->packet_index,
996 struct packet_index, 1);
997 prev_index = &g_array_index(pos->packet_index,
998 struct packet_index, 0);
999 cur_index = &g_array_index(pos->packet_index,
1000 struct packet_index, 1);
1001 break;
1002 default:
1003 abort();
1004 break;
1005 }
4a744367 1006 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
f1f52630 1007 ret = get_next_index(session->ctx, viewer_stream, cur_index);
4a744367
JD
1008 if (ret < 0) {
1009 pos->offset = EOF;
1010 fprintf(stderr, "[error] get_next_index failed\n");
1011 return;
1012 }
1013
f1f52630
MD
1014 pos->packet_size = cur_index->packet_size;
1015 pos->content_size = cur_index->content_size;
4a744367 1016 pos->mmap_base_offset = 0;
f1f52630 1017 if (cur_index->offset == EOF) {
4a744367
JD
1018 pos->offset = EOF;
1019 } else {
1020 pos->offset = 0;
1021 }
1022
f1f52630
MD
1023 if (cur_index->content_size == 0) {
1024 file_stream->parent.cycles_timestamp =
1025 cur_index->ts_cycles.timestamp_end;
4a744367 1026 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
f1f52630
MD
1027 &file_stream->parent,
1028 cur_index->ts_cycles.timestamp_end);
4a744367 1029 } else {
f1f52630
MD
1030 /* Convert the timestamps and append to the real_index. */
1031 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1032 &file_stream->parent,
1033 cur_index->ts_cycles.timestamp_begin);
1034 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1035 &file_stream->parent,
1036 cur_index->ts_cycles.timestamp_end);
1037
1038 ctf_update_current_packet_index(&file_stream->parent,
1039 prev_index, cur_index);
1040
1041 file_stream->parent.cycles_timestamp =
1042 cur_index->ts_cycles.timestamp_begin;
1043 file_stream->parent.real_timestamp =
1044 cur_index->ts_real.timestamp_begin;
4a744367
JD
1045 }
1046
1047 if (pos->packet_size == 0 || pos->offset == EOF) {
1048 goto end;
1049 }
1050
1051 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1052 viewer_stream->id);
1053 ret = get_data_packet(session->ctx, pos, viewer_stream,
f1f52630
MD
1054 be64toh(cur_index->offset),
1055 cur_index->packet_size / CHAR_BIT);
4a744367
JD
1056 if (ret == -2) {
1057 goto retry;
1058 } else if (ret < 0) {
1059 pos->offset = EOF;
1060 fprintf(stderr, "[error] get_data_packet failed\n");
1061 return;
1062 }
1063
1064 printf_verbose("Index received : packet_size : %" PRIu64
1065 ", offset %" PRIu64 ", content_size %" PRIu64
1066 ", timestamp_end : %" PRIu64 "\n",
f1f52630
MD
1067 cur_index->packet_size, cur_index->offset,
1068 cur_index->content_size,
1069 cur_index->ts_cycles.timestamp_end);
4a744367
JD
1070
1071 /* update trace_packet_header and stream_packet_context */
1072 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1073 /* Read packet header */
1074 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1075 if (ret) {
1076 pos->offset = EOF;
1077 fprintf(stderr, "[error] trace packet header read failed\n");
1078 goto end;
1079 }
1080 }
1081 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1082 /* Read packet context */
1083 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1084 if (ret) {
1085 pos->offset = EOF;
1086 fprintf(stderr, "[error] stream packet context read failed\n");
1087 goto end;
1088 }
1089 }
1090 pos->data_offset = pos->offset;
1091
1092end:
1093 return;
1094}
1095
2acdc547
JD
1096int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1097{
1098 struct lttng_viewer_cmd cmd;
1099 struct lttng_viewer_create_session_response resp;
1100 int ret;
1101 ssize_t ret_len;
1102
1103 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1104 cmd.data_size = 0;
1105 cmd.cmd_version = 0;
1106
1107 do {
1108 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1109 } while (ret_len < 0 && errno == EINTR);
1110 if (ret_len < 0) {
1111 fprintf(stderr, "[error] Error sending cmd\n");
1112 ret = ret_len;
1113 goto error;
1114 }
1115 assert(ret_len == sizeof(cmd));
1116
1117 do {
1118 ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
1119 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
1120 if (ret_len == 0) {
1121 fprintf(stderr, "[error] Remote side has closed connection\n");
1122 ret = -1;
1123 goto error;
1124 }
2acdc547
JD
1125 if (ret_len < 0) {
1126 fprintf(stderr, "[error] Error receiving create session reply\n");
1127 ret = ret_len;
1128 goto error;
1129 }
1130 assert(ret_len == sizeof(resp));
1131
1132 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1133 fprintf(stderr, "[error] Error creating viewer session\n");
1134 ret = -1;
1135 goto error;
1136 }
1137 ret = 0;
1138
1139error:
1140 return ret;
1141}
1142
1143static
1144int del_traces(gpointer key, gpointer value, gpointer user_data)
4a744367
JD
1145{
1146 struct bt_context *bt_ctx = user_data;
1147 struct lttng_live_ctf_trace *trace = value;
1148 int ret;
1149
1150 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1151 if (ret < 0)
1152 fprintf(stderr, "[error] removing trace from context\n");
1153
1154 /* remove the key/value pair from the HT. */
1155 return 1;
1156}
1157
2acdc547
JD
1158static
1159void add_traces(gpointer key, gpointer value, gpointer user_data)
4a744367
JD
1160{
1161 int i, ret, total_metadata = 0;
1162 uint64_t metadata_len;
1163 struct bt_context *bt_ctx = user_data;
1164 struct lttng_live_ctf_trace *trace = value;
1165 struct lttng_live_viewer_stream *stream;
1166 struct bt_mmap_stream *new_mmap_stream;
1167 struct bt_mmap_stream_list mmap_list;
1168 struct lttng_live_ctx *ctx = NULL;
1169
b5a1fa45
JD
1170 /*
1171 * We don't know how many streams we will receive for a trace, so
1172 * once we are done receiving the traces, we add all the traces
1173 * received to the bt_context.
1174 * We can receive streams during the attach command or the
1175 * get_new_streams, so we have to make sure not to add multiple
1176 * times the same traces.
1177 * If a trace is already in the context, we just skip this function.
1178 */
2acdc547
JD
1179 if (trace->in_use)
1180 return;
1181
4a744367
JD
1182 BT_INIT_LIST_HEAD(&mmap_list.head);
1183
1184 for (i = 0; i < trace->streams->len; i++) {
1185 stream = g_ptr_array_index(trace->streams, i);
1186 ctx = stream->session->ctx;
1187
1188 if (!stream->metadata_flag) {
1189 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1190 new_mmap_stream->priv = (void *) stream;
1191 new_mmap_stream->fd = -1;
1192 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1193 } else {
1194 /* Get all possible metadata before starting */
1195 do {
1196 ret = get_new_metadata(ctx, stream,
1197 &metadata_len);
1198 if (ret == 0) {
1199 total_metadata += metadata_len;
1200 }
1201 } while (ret == 0 || total_metadata == 0);
1202 trace->metadata_fp = fopen(stream->path, "r");
1203 }
1204 }
1205
1206 if (!trace->metadata_fp) {
1207 fprintf(stderr, "[error] No metadata stream opened\n");
1208 goto end_free;
1209 }
1210
1211 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1212 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1213 if (ret < 0) {
1214 fprintf(stderr, "[error] Error adding trace\n");
1215 goto end_free;
1216 }
2acdc547
JD
1217
1218 if (bt_ctx->current_iterator) {
1219 struct bt_trace_descriptor *td;
1220 struct bt_trace_handle *handle;
1221
1222 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1223 bt_ctx->trace_handles,
1224 (gpointer) (unsigned long) ret);
1225 td = handle->td;
2acdc547
JD
1226 bt_iter_add_trace(bt_ctx->current_iterator, td);
1227 }
1228
4a744367 1229 trace->trace_id = ret;
2acdc547 1230 trace->in_use = 1;
4a744367
JD
1231
1232 goto end;
1233
1234end_free:
1235 bt_context_put(bt_ctx);
1236end:
1237 return;
1238}
1239
2acdc547 1240int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
6192b988
JD
1241{
1242 struct lttng_viewer_cmd cmd;
2acdc547
JD
1243 struct lttng_viewer_new_streams_request rq;
1244 struct lttng_viewer_new_streams_response rp;
1245 struct lttng_viewer_stream stream;
1246 int ret, i;
6192b988 1247 ssize_t ret_len;
b5a1fa45 1248 uint32_t stream_count;
6192b988 1249
2acdc547
JD
1250 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1251 cmd.data_size = sizeof(rq);
6192b988
JD
1252 cmd.cmd_version = 0;
1253
2acdc547
JD
1254 memset(&rq, 0, sizeof(rq));
1255 rq.session_id = htobe64(id);
1256
6192b988
JD
1257 do {
1258 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1259 } while (ret_len < 0 && errno == EINTR);
1260 if (ret_len < 0) {
1261 fprintf(stderr, "[error] Error sending cmd\n");
1262 ret = ret_len;
1263 goto error;
1264 }
1265 assert(ret_len == sizeof(cmd));
1266
1267 do {
2acdc547 1268 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
6192b988
JD
1269 } while (ret_len < 0 && errno == EINTR);
1270 if (ret_len < 0) {
b5a1fa45 1271 fprintf(stderr, "[error] Error sending get_new_streams request\n");
6192b988
JD
1272 ret = ret_len;
1273 goto error;
1274 }
2acdc547 1275 assert(ret_len == sizeof(rq));
6192b988 1276
2acdc547
JD
1277 do {
1278 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
1279 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
1280 if (ret_len == 0) {
1281 fprintf(stderr, "[error] Remote side has closed connection\n");
1282 ret = -1;
1283 goto error;
1284 }
2acdc547
JD
1285 if (ret_len < 0) {
1286 fprintf(stderr, "[error] Error receiving get_new_streams response\n");
1287 ret = ret_len;
6192b988
JD
1288 goto error;
1289 }
2acdc547
JD
1290 assert(ret_len == sizeof(rp));
1291
1292 switch(be32toh(rp.status)) {
1293 case LTTNG_VIEWER_NEW_STREAMS_OK:
1294 break;
1295 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1296 ret = 0;
1297 goto end;
1298 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1299 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1300 goto end;
1301 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1302 fprintf(stderr, "[error] get_new_streams error\n");
1303 ret = -1;
1304 goto end;
1305 default:
1306 fprintf(stderr, "[error] Unknown return code %u\n",
1307 be32toh(rp.status));
1308 ret = -1;
1309 goto end;
1310 }
1311
b5a1fa45
JD
1312 stream_count = be32toh(rp.streams_count);
1313 ctx->session->stream_count += stream_count;
2acdc547
JD
1314 /*
1315 * When the session is created but not started, we do an active wait
1316 * until it starts. It allows the viewer to start processing the trace
1317 * as soon as the session starts.
1318 */
1319 if (ctx->session->stream_count == 0) {
1320 ret = 0;
1321 goto end;
1322 }
1323 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1324 ctx->session->stream_count);
1325 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1326 ctx->session->stream_count);
b5a1fa45 1327 for (i = 0; i < stream_count; i++) {
2acdc547
JD
1328 do {
1329 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
1330 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
1331 if (ret_len == 0) {
1332 fprintf(stderr, "[error] Remote side has closed connection\n");
1333 ret = -1;
1334 goto error;
1335 }
2acdc547
JD
1336 if (ret_len < 0) {
1337 fprintf(stderr, "[error] Error receiving stream\n");
1338 ret = ret_len;
1339 goto error;
1340 }
1341 assert(ret_len == sizeof(stream));
1342 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1343 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1344
1345 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1346 be64toh(stream.id), stream.path_name,
1347 stream.channel_name);
1348 ctx->session->streams[i].id = be64toh(stream.id);
1349 ctx->session->streams[i].session = ctx->session;
1350
1351 ctx->session->streams[i].first_read = 1;
1352 ctx->session->streams[i].mmap_size = 0;
1353
1354 if (be32toh(stream.metadata_flag)) {
1355 char *path;
1356
1357 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
1358 if (!path) {
1359 perror("strdup");
1360 ret = -1;
1361 goto error;
1362 }
1363 if (!mkdtemp(path)) {
1364 perror("mkdtemp");
1365 free(path);
1366 ret = -1;
1367 goto error;
1368 }
1369 ctx->session->streams[i].metadata_flag = 1;
1370 snprintf(ctx->session->streams[i].path,
1371 sizeof(ctx->session->streams[i].path),
1372 "%s/%s", path,
1373 stream.channel_name);
1374 ret = open(ctx->session->streams[i].path,
1375 O_WRONLY | O_CREAT | O_TRUNC,
1376 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1377 if (ret < 0) {
1378 perror("open");
1379 free(path);
1380 goto error;
1381 }
1382 ctx->session->streams[i].fd = ret;
1383 free(path);
1384 }
1385 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1386 be64toh(stream.ctf_trace_id));
1387 if (ret < 0) {
1388 goto error;
1389 }
1390
1391 }
6192b988
JD
1392 ret = 0;
1393
2acdc547 1394end:
6192b988
JD
1395error:
1396 return ret;
1397}
1398
2acdc547 1399void lttng_live_read(struct lttng_live_ctx *ctx)
4a744367 1400{
2acdc547 1401 int ret, i;
4a744367
JD
1402 struct bt_ctf_iter *iter;
1403 const struct bt_ctf_event *event;
1404 struct bt_iter_pos begin_pos;
1405 struct bt_trace_descriptor *td_write;
1406 struct bt_format *fmt_write;
1407 struct ctf_text_stream_pos *sout;
2acdc547 1408 uint64_t id;
4a744367 1409
2acdc547
JD
1410 ctx->bt_ctx = bt_context_create();
1411 if (!ctx->bt_ctx) {
4a744367
JD
1412 fprintf(stderr, "[error] bt_context_create allocation\n");
1413 goto end;
1414 }
1415
1416 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1417 if (!fmt_write) {
1418 fprintf(stderr, "[error] ctf-text error\n");
1419 goto end;
1420 }
1421
1422 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1423 if (!td_write) {
1424 fprintf(stderr, "[error] Error opening output trace\n");
1425 goto end_free;
1426 }
1427
1428 sout = container_of(td_write, struct ctf_text_stream_pos,
1429 trace_descriptor);
1430 if (!sout->parent.event_cb)
1431 goto end_free;
1432
6192b988
JD
1433 ret = lttng_live_create_viewer_session(ctx);
1434 if (ret < 0) {
1435 goto end_free;
1436 }
1437
2acdc547
JD
1438 for (i = 0; i < ctx->session_ids->len; i++) {
1439 id = g_array_index(ctx->session_ids, uint64_t, i);
1440 printf_verbose("Attaching to session %lu\n", id);
1441 ret = lttng_live_attach_session(ctx, id);
1442 printf_verbose("Attaching session returns %d\n", ret);
1443 if (ret < 0) {
1444 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1445 fprintf(stderr, "[error] Unknown session ID\n");
1446 }
1447 goto end_free;
1448 }
1449 }
1450
4a744367 1451 /*
2acdc547 1452 * As long as the session is active, we try to get new streams.
4a744367 1453 */
2acdc547 1454 for (;;) {
4a744367
JD
1455 int flags;
1456
2acdc547
JD
1457 while (!ctx->session->stream_count) {
1458 if (ctx->session_ids->len == 0)
4a744367 1459 goto end_free;
2acdc547
JD
1460 ret = ask_new_streams(ctx);
1461 if (ret < 0)
1462 goto end_free;
1463 }
4a744367 1464
2acdc547
JD
1465 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1466 ctx->bt_ctx);
4a744367
JD
1467
1468 begin_pos.type = BT_SEEK_BEGIN;
2acdc547 1469 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
4a744367
JD
1470 if (!iter) {
1471 fprintf(stderr, "[error] Iterator creation error\n");
1472 goto end;
1473 }
1474 for (;;) {
1475 event = bt_ctf_iter_read_event_flags(iter, &flags);
1476 if (!(flags & BT_ITER_FLAG_RETRY)) {
1477 if (!event) {
1478 /* End of trace */
1479 break;
1480 }
1481 ret = sout->parent.event_cb(&sout->parent,
1482 event->parent->stream);
1483 if (ret) {
1484 fprintf(stderr, "[error] Writing "
1485 "event failed.\n");
1486 goto end_free;
1487 }
1488 }
1489 ret = bt_iter_next(bt_ctf_get_iter(iter));
1490 if (ret < 0) {
1491 goto end_free;
1492 }
1493 }
1494 bt_ctf_iter_destroy(iter);
2acdc547
JD
1495 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1496 del_traces, ctx->bt_ctx);
1497 ctx->session->stream_count = 0;
1498 }
4a744367
JD
1499
1500end_free:
2acdc547 1501 bt_context_put(ctx->bt_ctx);
4a744367
JD
1502end:
1503 return;
1504}
This page took 0.085315 seconds and 4 git commands to generate.