Fix: get_new_metadata receive all the metadata
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
CommitLineData
4a744367
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <netinet/in.h>
27#include <netdb.h>
28#include <stdio.h>
29#include <string.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <errno.h>
33#include <inttypes.h>
34#include <fcntl.h>
35#include <sys/mman.h>
36
37#include <babeltrace/ctf/ctf-index.h>
38
39#include <babeltrace/babeltrace.h>
40#include <babeltrace/ctf/events.h>
41#include <babeltrace/ctf/callbacks.h>
42#include <babeltrace/ctf/iterator.h>
43
44/* for packet_index */
45#include <babeltrace/ctf/types.h>
46
47#include <babeltrace/ctf/metadata.h>
48#include <babeltrace/ctf-text/types.h>
49#include <babeltrace/ctf/events-internal.h>
50#include <formats/ctf/events-private.h>
51
c98627ca 52#include "lttng-live.h"
158eea6e 53#include "lttng-viewer-abi.h"
4a744367
JD
54
55/*
56 * Memory allocation zeroed
57 */
58#define zmalloc(x) calloc(1, x)
59
60#ifndef max_t
61#define max_t(type, a, b) \
62 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
63#endif
64
b5a1fa45
JD
65static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
66 size_t index, int whence);
2acdc547
JD
67static void add_traces(gpointer key, gpointer value, gpointer user_data);
68static int del_traces(gpointer key, gpointer value, gpointer user_data);
9abca178
JD
69static int get_new_metadata(struct lttng_live_ctx *ctx,
70 struct lttng_live_viewer_stream *viewer_stream);
2acdc547 71
b5a1fa45 72int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
4a744367
JD
73{
74 struct hostent *host;
75 struct sockaddr_in server_addr;
76 int ret;
77
b5a1fa45 78 host = gethostbyname(ctx->relay_hostname);
4a744367
JD
79 if (!host) {
80 ret = -1;
81 goto end;
82 }
83
84 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
85 perror("Socket");
86 ret = -1;
87 goto end;
88 }
89
90 server_addr.sin_family = AF_INET;
b5a1fa45 91 server_addr.sin_port = htons(ctx->port);
4a744367
JD
92 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
93 bzero(&(server_addr.sin_zero), 8);
94
95 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
96 sizeof(struct sockaddr)) == -1) {
97 perror("Connect");
98 ret = -1;
99 goto end;
100 }
101
102 ret = 0;
103
104end:
105 return ret;
106}
107
108int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
109{
110 struct lttng_viewer_cmd cmd;
111 struct lttng_viewer_connect connect;
112 int ret;
113 ssize_t ret_len;
114
115 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
116 cmd.data_size = sizeof(connect);
117 cmd.cmd_version = 0;
118
0c3cd7e1 119 connect.viewer_session_id = -1ULL; /* will be set on recv */
4a744367
JD
120 connect.major = htobe32(LTTNG_LIVE_MAJOR);
121 connect.minor = htobe32(LTTNG_LIVE_MINOR);
122 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
123
124 do {
125 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
126 } while (ret_len < 0 && errno == EINTR);
127 if (ret_len < 0) {
79620497 128 perror("[error] Error sending cmd");
4a744367
JD
129 ret = ret_len;
130 goto error;
131 }
132 assert(ret_len == sizeof(cmd));
133
134 do {
135 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
136 } while (ret_len < 0 && errno == EINTR);
137 if (ret_len < 0) {
79620497 138 perror("[error] Error sending version");
4a744367
JD
139 ret = ret_len;
140 goto error;
141 }
142 assert(ret_len == sizeof(connect));
143
144 do {
145 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
146 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
147 if (ret_len == 0) {
148 fprintf(stderr, "[error] Remote side has closed connection\n");
149 ret = -1;
150 goto error;
151 }
4a744367 152 if (ret_len < 0) {
79620497 153 perror("[error] Error receiving version");
4a744367
JD
154 ret = ret_len;
155 goto error;
156 }
157 assert(ret_len == sizeof(connect));
158
159 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
160 be64toh(connect.viewer_session_id));
161 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
162 be32toh(connect.minor));
163
164 ret = 0;
165
166error:
167 return ret;
168}
169
b5a1fa45
JD
170static
171void free_session_list(GPtrArray *session_list)
172{
173 int i;
174 struct lttng_live_relay_session *relay_session;
175
176 for (i = 0; i < session_list->len; i++) {
177 relay_session = g_ptr_array_index(session_list, i);
178 free(relay_session->name);
179 free(relay_session->hostname);
180 }
181 g_ptr_array_free(session_list, TRUE);
182}
183
184static
185void print_session_list(GPtrArray *session_list, const char *path)
186{
187 int i;
188 struct lttng_live_relay_session *relay_session;
189
190 for (i = 0; i < session_list->len; i++) {
191 relay_session = g_ptr_array_index(session_list, i);
192 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
193 "%u stream(s), %u client(s) connected)\n",
194 path, relay_session->hostname,
195 relay_session->name, relay_session->timer,
196 relay_session->streams, relay_session->clients);
197 }
198}
199
200static
201void update_session_list(GPtrArray *session_list, char *hostname,
202 char *session_name, uint32_t streams, uint32_t clients,
203 uint32_t timer)
204{
205 int i, found = 0;
206 struct lttng_live_relay_session *relay_session;
207
208 for (i = 0; i < session_list->len; i++) {
209 relay_session = g_ptr_array_index(session_list, i);
210 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
211 strncmp(relay_session->name,
212 session_name, NAME_MAX) == 0) {
213 relay_session->streams += streams;
214 if (relay_session->clients < clients)
215 relay_session->clients = clients;
216 found = 1;
217 break;
218 }
219 }
220 if (found)
221 return;
222
223 relay_session = g_new0(struct lttng_live_relay_session, 1);
224 relay_session->hostname = strndup(hostname, NAME_MAX);
225 relay_session->name = strndup(session_name, NAME_MAX);
226 relay_session->clients = clients;
227 relay_session->streams = streams;
228 relay_session->timer = timer;
229 g_ptr_array_add(session_list, relay_session);
230}
231
4a744367
JD
232int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
233{
234 struct lttng_viewer_cmd cmd;
235 struct lttng_viewer_list_sessions list;
236 struct lttng_viewer_session lsession;
b5a1fa45 237 int i, ret, sessions_count, print_list = 0;
4a744367 238 ssize_t ret_len;
b5a1fa45
JD
239 uint64_t session_id;
240 GPtrArray *session_list = NULL;
241
242 if (strlen(ctx->session_name) == 0) {
243 print_list = 1;
244 session_list = g_ptr_array_new();
245 }
4a744367
JD
246
247 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
248 cmd.data_size = 0;
249 cmd.cmd_version = 0;
250
251 do {
252 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
253 } while (ret_len < 0 && errno == EINTR);
254 if (ret_len < 0) {
79620497 255 perror("[error] Error sending cmd");
4a744367
JD
256 ret = ret_len;
257 goto error;
258 }
259 assert(ret_len == sizeof(cmd));
260
261 do {
262 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
263 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
264 if (ret_len == 0) {
265 fprintf(stderr, "[error] Remote side has closed connection\n");
266 ret = -1;
267 goto error;
268 }
4a744367 269 if (ret_len < 0) {
79620497 270 perror("[error] Error receiving session list");
4a744367
JD
271 ret = ret_len;
272 goto error;
273 }
274 assert(ret_len == sizeof(list));
275
276 sessions_count = be32toh(list.sessions_count);
4a744367
JD
277 for (i = 0; i < sessions_count; i++) {
278 do {
279 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
280 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
281 if (ret_len == 0) {
282 fprintf(stderr, "[error] Remote side has closed connection\n");
283 ret = -1;
284 goto error;
285 }
4a744367 286 if (ret_len < 0) {
79620497 287 perror("[error] Error receiving session");
4a744367
JD
288 ret = ret_len;
289 goto error;
290 }
291 assert(ret_len == sizeof(lsession));
9ed31e53
MD
292 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
293 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
b5a1fa45
JD
294 session_id = be64toh(lsession.id);
295
296 if (print_list) {
297 update_session_list(session_list,
298 lsession.hostname,
299 lsession.session_name,
300 be32toh(lsession.streams),
301 be32toh(lsession.clients),
302 be32toh(lsession.live_timer));
303 } else {
304 if ((strncmp(lsession.session_name, ctx->session_name,
305 NAME_MAX) == 0) && (strncmp(lsession.hostname,
306 ctx->traced_hostname, NAME_MAX) == 0)) {
307 printf_verbose("Reading from session %" PRIu64 "\n",
308 session_id);
309 g_array_append_val(ctx->session_ids,
310 session_id);
311 }
312 }
313 }
4a744367 314
b5a1fa45
JD
315 if (print_list) {
316 print_session_list(session_list, path);
317 free_session_list(session_list);
4a744367
JD
318 }
319
320 ret = 0;
321
322error:
323 return ret;
324}
325
326int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
327 uint64_t ctf_trace_id)
328{
329 struct lttng_live_ctf_trace *trace;
330 int ret = 0;
331
332 trace = g_hash_table_lookup(stream->session->ctf_traces,
333 (gpointer) ctf_trace_id);
334 if (!trace) {
335 trace = g_new0(struct lttng_live_ctf_trace, 1);
4a744367
JD
336 trace->ctf_trace_id = ctf_trace_id;
337 trace->streams = g_ptr_array_new();
338 g_hash_table_insert(stream->session->ctf_traces,
339 (gpointer) ctf_trace_id,
340 trace);
341 }
342 if (stream->metadata_flag)
343 trace->metadata_stream = stream;
344
345 stream->ctf_trace = trace;
346 g_ptr_array_add(trace->streams, stream);
347
4a744367
JD
348 return ret;
349}
350
351int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
352{
353 struct lttng_viewer_cmd cmd;
354 struct lttng_viewer_attach_session_request rq;
355 struct lttng_viewer_attach_session_response rp;
356 struct lttng_viewer_stream stream;
357 int ret, i;
358 ssize_t ret_len;
359
360 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
361 cmd.data_size = sizeof(rq);
362 cmd.cmd_version = 0;
363
364 memset(&rq, 0, sizeof(rq));
365 rq.session_id = htobe64(id);
366 // TODO: add cmd line parameter to select seek beginning
367 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
368 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
369
370 do {
371 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
372 } while (ret_len < 0 && errno == EINTR);
373 if (ret_len < 0) {
79620497 374 perror("[error] Error sending cmd");
4a744367
JD
375 ret = ret_len;
376 goto error;
377 }
378 assert(ret_len == sizeof(cmd));
379
380 do {
381 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
382 } while (ret_len < 0 && errno == EINTR);
383 if (ret_len < 0) {
79620497 384 perror("[error] Error sending attach request");
4a744367
JD
385 ret = ret_len;
386 goto error;
387 }
388 assert(ret_len == sizeof(rq));
389
390 do {
391 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
392 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
393 if (ret_len == 0) {
394 fprintf(stderr, "[error] Remote side has closed connection\n");
395 ret = -1;
396 goto error;
397 }
4a744367 398 if (ret_len < 0) {
79620497 399 perror("[error] Error receiving attach response");
4a744367
JD
400 ret = ret_len;
401 goto error;
402 }
403 assert(ret_len == sizeof(rp));
404
405 switch(be32toh(rp.status)) {
406 case LTTNG_VIEWER_ATTACH_OK:
407 break;
408 case LTTNG_VIEWER_ATTACH_UNK:
409 ret = -LTTNG_VIEWER_ATTACH_UNK;
410 goto end;
411 case LTTNG_VIEWER_ATTACH_ALREADY:
79620497 412 fprintf(stderr, "[error] There is already a viewer attached to this session\n");
4a744367
JD
413 ret = -1;
414 goto end;
415 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
416 fprintf(stderr, "[error] Not a live session\n");
417 ret = -1;
418 goto end;
419 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
420 fprintf(stderr, "[error] Wrong seek parameter\n");
421 ret = -1;
422 goto end;
423 default:
424 fprintf(stderr, "[error] Unknown attach return code %u\n",
425 be32toh(rp.status));
426 ret = -1;
427 goto end;
428 }
429 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
430 ret = -1;
431 goto end;
432 }
433
2acdc547 434 ctx->session->stream_count += be32toh(rp.streams_count);
4a744367
JD
435 /*
436 * When the session is created but not started, we do an active wait
437 * until it starts. It allows the viewer to start processing the trace
438 * as soon as the session starts.
439 */
440 if (ctx->session->stream_count == 0) {
441 ret = 0;
442 goto end;
443 }
444 printf_verbose("Waiting for %" PRIu64 " streams:\n",
445 ctx->session->stream_count);
446 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
447 ctx->session->stream_count);
4a744367
JD
448 for (i = 0; i < be32toh(rp.streams_count); i++) {
449 do {
450 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
451 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
452 if (ret_len == 0) {
453 fprintf(stderr, "[error] Remote side has closed connection\n");
454 ret = -1;
455 goto error;
456 }
4a744367 457 if (ret_len < 0) {
79620497 458 perror("[error] Error receiving stream");
4a744367
JD
459 ret = ret_len;
460 goto error;
461 }
462 assert(ret_len == sizeof(stream));
9ed31e53
MD
463 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
464 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
4a744367
JD
465
466 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
467 be64toh(stream.id), stream.path_name,
468 stream.channel_name);
469 ctx->session->streams[i].id = be64toh(stream.id);
470 ctx->session->streams[i].session = ctx->session;
471
472 ctx->session->streams[i].first_read = 1;
473 ctx->session->streams[i].mmap_size = 0;
474
475 if (be32toh(stream.metadata_flag)) {
476 char *path;
477
478 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
6a4d2b5a
MD
479 if (!path) {
480 perror("strdup");
481 ret = -1;
482 goto error;
483 }
484 if (!mkdtemp(path)) {
485 perror("mkdtemp");
486 free(path);
487 ret = -1;
488 goto error;
489 }
4a744367 490 ctx->session->streams[i].metadata_flag = 1;
4a744367
JD
491 snprintf(ctx->session->streams[i].path,
492 sizeof(ctx->session->streams[i].path),
493 "%s/%s", path,
494 stream.channel_name);
495 ret = open(ctx->session->streams[i].path,
496 O_WRONLY | O_CREAT | O_TRUNC,
497 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
498 if (ret < 0) {
ab9d07f0 499 perror("open");
6a4d2b5a 500 free(path);
4a744367
JD
501 goto error;
502 }
503 ctx->session->streams[i].fd = ret;
6a4d2b5a 504 free(path);
4a744367
JD
505 }
506 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
507 be64toh(stream.ctf_trace_id));
508 if (ret < 0) {
509 goto error;
510 }
511
512 }
513 ret = 0;
514
515end:
516error:
517 return ret;
518}
519
2acdc547
JD
520static
521int ask_new_streams(struct lttng_live_ctx *ctx)
522{
523 int i, ret = 0;
524 uint64_t id;
525
526restart:
527 for (i = 0; i < ctx->session_ids->len; i++) {
528 id = g_array_index(ctx->session_ids, uint64_t, i);
529 ret = lttng_live_get_new_streams(ctx, id);
530 printf_verbose("Asking for new streams returns %d\n",
531 ret);
532 if (ret < 0) {
533 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
534 printf_verbose("Session %" PRIu64 " closed\n",
535 id);
b5a1fa45
JD
536 /*
537 * The streams have already been closed during
538 * the reading, so we only need to get rid of
539 * the trace in our internal table of sessions.
540 */
2acdc547
JD
541 g_array_remove_index(ctx->session_ids, i);
542 /*
543 * We can't continue iterating on the g_array
544 * after a remove, we have to start again.
545 */
546 goto restart;
547 } else {
548 ret = -1;
549 goto end;
550 }
551 }
552 }
553
554end:
555 return ret;
556}
557
4a744367
JD
558static
559int get_data_packet(struct lttng_live_ctx *ctx,
560 struct ctf_stream_pos *pos,
561 struct lttng_live_viewer_stream *stream, uint64_t offset,
562 uint64_t len)
563{
564 struct lttng_viewer_cmd cmd;
565 struct lttng_viewer_get_packet rq;
566 struct lttng_viewer_trace_packet rp;
567 ssize_t ret_len;
568 int ret;
569
570 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
571 cmd.data_size = sizeof(rq);
572 cmd.cmd_version = 0;
573
574 memset(&rq, 0, sizeof(rq));
575 rq.stream_id = htobe64(stream->id);
576 /* Already in big endian. */
577 rq.offset = offset;
578 rq.len = htobe32(len);
579
580 do {
581 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
582 } while (ret_len < 0 && errno == EINTR);
583 if (ret_len < 0) {
79620497 584 perror("[error] Error sending cmd");
4a744367
JD
585 ret = ret_len;
586 goto error;
587 }
588 assert(ret_len == sizeof(cmd));
589
590 do {
591 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
592 } while (ret_len < 0 && errno == EINTR);
593 if (ret_len < 0) {
79620497 594 perror("[error] Error sending get_data_packet request");
4a744367
JD
595 ret = ret_len;
596 goto error;
597 }
598 assert(ret_len == sizeof(rq));
599
600 do {
601 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
602 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
603 if (ret_len == 0) {
604 fprintf(stderr, "[error] Remote side has closed connection\n");
605 ret = -1;
606 goto error;
607 }
4a744367 608 if (ret_len < 0) {
79620497 609 perror("[error] Error receiving data response");
4a744367
JD
610 ret = ret_len;
611 goto error;
612 }
57b9d843
JD
613 if (ret_len != sizeof(rp)) {
614 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
615 ", received %" PRId64 "\n", ret_len,
616 sizeof(rp));
617 ret = -1;
618 goto error;
619 }
4a744367
JD
620
621 rp.flags = be32toh(rp.flags);
622
623 switch (be32toh(rp.status)) {
624 case LTTNG_VIEWER_GET_PACKET_OK:
625 len = be32toh(rp.len);
626 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
627 "\n", len);
628 break;
629 case LTTNG_VIEWER_GET_PACKET_RETRY:
630 printf_verbose("get_data_packet: retry\n");
631 ret = -1;
632 goto end;
633 case LTTNG_VIEWER_GET_PACKET_ERR:
634 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
635 printf_verbose("get_data_packet: new metadata needed\n");
636 ret = 0;
637 goto end;
638 }
2acdc547
JD
639 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
640 ret = ask_new_streams(ctx);
641 if (ret < 0)
642 goto error;
643 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
644 ctx->bt_ctx);
645 }
4a744367
JD
646 fprintf(stderr, "[error] get_data_packet: error\n");
647 ret = -1;
648 goto end;
649 case LTTNG_VIEWER_GET_PACKET_EOF:
650 ret = -2;
651 goto error;
652 default:
653 printf_verbose("get_data_packet: unknown\n");
654 assert(0);
655 ret = -1;
656 goto end;
657 }
658
659 if (len <= 0) {
660 ret = -1;
661 goto end;
662 }
663
664 if (len > stream->mmap_size) {
665 uint64_t new_size;
666
667 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
668 if (pos->base_mma) {
669 /* unmap old base */
670 ret = munmap_align(pos->base_mma);
671 if (ret) {
79620497 672 perror("[error] Unable to unmap old base");
4a744367
JD
673 ret = -1;
674 goto error;
675 }
676 pos->base_mma = NULL;
677 }
678 pos->base_mma = mmap_align(new_size,
679 PROT_READ | PROT_WRITE,
680 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
681 if (pos->base_mma == MAP_FAILED) {
79620497 682 perror("[error] mmap error");
4a744367
JD
683 pos->base_mma = NULL;
684 ret = -1;
685 goto error;
686 }
687
688 stream->mmap_size = new_size;
689 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
690 stream->mmap_size);
691 }
692
693 do {
694 ret_len = recv(ctx->control_sock,
695 mmap_align_addr(pos->base_mma), len,
696 MSG_WAITALL);
697 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
698 if (ret_len == 0) {
699 fprintf(stderr, "[error] Remote side has closed connection\n");
700 ret = -1;
701 goto error;
702 }
4a744367 703 if (ret_len < 0) {
79620497 704 perror("[error] Error receiving trace packet");
4a744367
JD
705 ret = ret_len;
706 goto error;
707 }
708 assert(ret_len == len);
709
710end:
711error:
712 return ret;
713}
714
4a744367 715static
9abca178
JD
716int get_one_metadata_packet(struct lttng_live_ctx *ctx,
717 struct lttng_live_viewer_stream *metadata_stream)
4a744367
JD
718{
719 uint64_t len = 0;
720 int ret;
721 struct lttng_viewer_cmd cmd;
722 struct lttng_viewer_get_metadata rq;
723 struct lttng_viewer_metadata_packet rp;
4a744367
JD
724 char *data = NULL;
725 ssize_t ret_len;
726
9abca178 727 rq.stream_id = htobe64(metadata_stream->id);
4a744367
JD
728 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
729 cmd.data_size = sizeof(rq);
730 cmd.cmd_version = 0;
731
4a744367
JD
732 do {
733 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
734 } while (ret_len < 0 && errno == EINTR);
735 if (ret_len < 0) {
79620497 736 perror("[error] Error sending cmd");
4a744367
JD
737 ret = ret_len;
738 goto error;
739 }
740 assert(ret_len == sizeof(cmd));
741
742 do {
743 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
744 } while (ret_len < 0 && errno == EINTR);
745 if (ret_len < 0) {
79620497 746 perror("[error] Error sending get_metadata request");
4a744367
JD
747 ret = ret_len;
748 goto error;
749 }
750 assert(ret_len == sizeof(rq));
751
752 do {
753 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
754 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
755 if (ret_len == 0) {
756 fprintf(stderr, "[error] Remote side has closed connection\n");
757 ret = -1;
758 goto error;
759 }
4a744367 760 if (ret_len < 0) {
79620497 761 perror("[error] Error receiving metadata response");
4a744367
JD
762 ret = ret_len;
763 goto error;
764 }
765 assert(ret_len == sizeof(rp));
766
767 switch (be32toh(rp.status)) {
768 case LTTNG_VIEWER_METADATA_OK:
769 printf_verbose("get_metadata : OK\n");
770 break;
771 case LTTNG_VIEWER_NO_NEW_METADATA:
772 printf_verbose("get_metadata : NO NEW\n");
9abca178 773 ret = 0;
4a744367
JD
774 goto end;
775 case LTTNG_VIEWER_METADATA_ERR:
776 printf_verbose("get_metadata : ERR\n");
777 ret = -1;
778 goto end;
779 default:
780 printf_verbose("get_metadata : UNKNOWN\n");
781 ret = -1;
782 goto end;
783 }
784
785 len = be64toh(rp.len);
786 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
787 if (len <= 0) {
788 ret = -1;
789 goto end;
790 }
791
792 data = zmalloc(len);
793 if (!data) {
794 perror("relay data zmalloc");
795 ret = -1;
796 goto error;
797 }
798 do {
799 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
800 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
801 if (ret_len == 0) {
802 fprintf(stderr, "[error] Remote side has closed connection\n");
803 ret = -1;
804 free(data);
805 goto error;
806 }
4a744367 807 if (ret_len < 0) {
79620497 808 perror("[error] Error receiving trace packet");
4a744367
JD
809 ret = ret_len;
810 free(data);
811 goto error;
812 }
813 assert(ret_len == len);
814
815 do {
816 ret_len = write(metadata_stream->fd, data, len);
817 } while (ret_len < 0 && errno == EINTR);
818 if (ret_len < 0) {
819 free(data);
820 ret = ret_len;
821 goto error;
822 }
823 assert(ret_len == len);
9abca178 824 ret = len;
4a744367
JD
825
826 free(data);
827
4a744367
JD
828end:
829error:
830 return ret;
831}
832
9abca178
JD
833/*
834 * Return 0 on success, a negative value on error.
835 */
836static
837int get_new_metadata(struct lttng_live_ctx *ctx,
838 struct lttng_live_viewer_stream *viewer_stream)
839{
840 int ret = 0;
841 struct lttng_live_viewer_stream *metadata_stream;
842
843 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
844
845 do {
846 /*
847 * get_one_metadata_packet returns the number of bytes
848 * received, 0 when we have received everything, a
849 * negative value on error.
850 */
851 ret = get_one_metadata_packet(ctx, metadata_stream);
852 } while (ret > 0);
853
854 return ret;
855}
856
4a744367
JD
857/*
858 * Get one index for a stream.
859 *
860 * Returns 0 on success or a negative value on error.
861 */
862static
863int get_next_index(struct lttng_live_ctx *ctx,
864 struct lttng_live_viewer_stream *viewer_stream,
865 struct packet_index *index)
866{
867 struct lttng_viewer_cmd cmd;
868 struct lttng_viewer_get_next_index rq;
869 struct lttng_viewer_index rp;
870 int ret;
4a744367
JD
871 ssize_t ret_len;
872
873 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
874 cmd.data_size = sizeof(rq);
875 cmd.cmd_version = 0;
876
877 memset(&rq, 0, sizeof(rq));
878 rq.stream_id = htobe64(viewer_stream->id);
879
880retry:
881 do {
882 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
883 } while (ret_len < 0 && errno == EINTR);
884 if (ret_len < 0) {
79620497 885 perror("[error] Error sending cmd");
4a744367
JD
886 ret = ret_len;
887 goto error;
888 }
889 assert(ret_len == sizeof(cmd));
890
891 do {
892 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
893 } while (ret_len < 0 && errno == EINTR);
894 if (ret_len < 0) {
79620497 895 perror("[error] Error sending get_next_index request");
4a744367
JD
896 ret = ret_len;
897 goto error;
898 }
899 assert(ret_len == sizeof(rq));
900
901 do {
902 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
903 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
904 if (ret_len == 0) {
905 fprintf(stderr, "[error] Remote side has closed connection\n");
906 ret = -1;
907 goto error;
908 }
4a744367 909 if (ret_len < 0) {
79620497 910 perror("[error] Error receiving index response");
4a744367
JD
911 ret = ret_len;
912 goto error;
913 }
914 assert(ret_len == sizeof(rp));
915
916 rp.flags = be32toh(rp.flags);
917
918 switch (be32toh(rp.status)) {
919 case LTTNG_VIEWER_INDEX_INACTIVE:
920 printf_verbose("get_next_index: inactive\n");
921 memset(index, 0, sizeof(struct packet_index));
992e8cc0 922 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
4a744367
JD
923 break;
924 case LTTNG_VIEWER_INDEX_OK:
925 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
926 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
927 index->offset = be64toh(rp.offset);
928 index->packet_size = be64toh(rp.packet_size);
929 index->content_size = be64toh(rp.content_size);
992e8cc0
MD
930 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
931 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
4a744367
JD
932 index->events_discarded = be64toh(rp.events_discarded);
933
934 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
935 printf_verbose("get_next_index: new metadata needed\n");
9abca178 936 ret = get_new_metadata(ctx, viewer_stream);
4a744367
JD
937 if (ret < 0) {
938 goto error;
939 }
940 }
2acdc547
JD
941 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
942 ret = ask_new_streams(ctx);
943 if (ret < 0)
944 goto error;
945 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
946 ctx->bt_ctx);
947 }
4a744367
JD
948 break;
949 case LTTNG_VIEWER_INDEX_RETRY:
950 printf_verbose("get_next_index: retry\n");
951 sleep(1);
952 goto retry;
953 case LTTNG_VIEWER_INDEX_HUP:
954 printf_verbose("get_next_index: stream hung up\n");
955 viewer_stream->id = -1ULL;
956 viewer_stream->fd = -1;
957 index->offset = EOF;
2acdc547 958 ctx->session->stream_count--;
4a744367
JD
959 break;
960 case LTTNG_VIEWER_INDEX_ERR:
961 fprintf(stderr, "[error] get_next_index: error\n");
962 ret = -1;
963 goto error;
964 default:
965 fprintf(stderr, "[error] get_next_index: unkwown value\n");
966 ret = -1;
967 goto error;
968 }
969
970 ret = 0;
971
972error:
973 return ret;
974}
975
b5a1fa45 976static
4a744367
JD
977void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
978 int whence)
979{
980 struct ctf_stream_pos *pos;
981 struct ctf_file_stream *file_stream;
f1f52630 982 struct packet_index *prev_index = NULL, *cur_index;
4a744367
JD
983 struct lttng_live_viewer_stream *viewer_stream;
984 struct lttng_live_session *session;
985 int ret;
986
987retry:
988 pos = ctf_pos(stream_pos);
989 file_stream = container_of(pos, struct ctf_file_stream, pos);
990 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
991 session = viewer_stream->session;
992
f1f52630
MD
993 switch (pos->packet_index->len) {
994 case 0:
995 g_array_set_size(pos->packet_index, 1);
996 cur_index = &g_array_index(pos->packet_index,
997 struct packet_index, 0);
998 break;
999 case 1:
1000 g_array_set_size(pos->packet_index, 2);
1001 prev_index = &g_array_index(pos->packet_index,
1002 struct packet_index, 0);
1003 cur_index = &g_array_index(pos->packet_index,
1004 struct packet_index, 1);
1005 break;
1006 case 2:
1007 g_array_index(pos->packet_index,
1008 struct packet_index, 0) =
1009 g_array_index(pos->packet_index,
1010 struct packet_index, 1);
1011 prev_index = &g_array_index(pos->packet_index,
1012 struct packet_index, 0);
1013 cur_index = &g_array_index(pos->packet_index,
1014 struct packet_index, 1);
1015 break;
1016 default:
1017 abort();
1018 break;
1019 }
4a744367 1020 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
f1f52630 1021 ret = get_next_index(session->ctx, viewer_stream, cur_index);
4a744367
JD
1022 if (ret < 0) {
1023 pos->offset = EOF;
1024 fprintf(stderr, "[error] get_next_index failed\n");
1025 return;
1026 }
1027
f1f52630
MD
1028 pos->packet_size = cur_index->packet_size;
1029 pos->content_size = cur_index->content_size;
4a744367 1030 pos->mmap_base_offset = 0;
f1f52630 1031 if (cur_index->offset == EOF) {
4a744367
JD
1032 pos->offset = EOF;
1033 } else {
1034 pos->offset = 0;
1035 }
1036
f1f52630
MD
1037 if (cur_index->content_size == 0) {
1038 file_stream->parent.cycles_timestamp =
1039 cur_index->ts_cycles.timestamp_end;
4a744367 1040 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
f1f52630
MD
1041 &file_stream->parent,
1042 cur_index->ts_cycles.timestamp_end);
4a744367 1043 } else {
f1f52630
MD
1044 /* Convert the timestamps and append to the real_index. */
1045 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1046 &file_stream->parent,
1047 cur_index->ts_cycles.timestamp_begin);
1048 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1049 &file_stream->parent,
1050 cur_index->ts_cycles.timestamp_end);
1051
1052 ctf_update_current_packet_index(&file_stream->parent,
1053 prev_index, cur_index);
1054
1055 file_stream->parent.cycles_timestamp =
1056 cur_index->ts_cycles.timestamp_begin;
1057 file_stream->parent.real_timestamp =
1058 cur_index->ts_real.timestamp_begin;
4a744367
JD
1059 }
1060
1061 if (pos->packet_size == 0 || pos->offset == EOF) {
1062 goto end;
1063 }
1064
1065 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1066 viewer_stream->id);
1067 ret = get_data_packet(session->ctx, pos, viewer_stream,
f1f52630
MD
1068 be64toh(cur_index->offset),
1069 cur_index->packet_size / CHAR_BIT);
4a744367
JD
1070 if (ret == -2) {
1071 goto retry;
1072 } else if (ret < 0) {
1073 pos->offset = EOF;
1074 fprintf(stderr, "[error] get_data_packet failed\n");
1075 return;
1076 }
1077
1078 printf_verbose("Index received : packet_size : %" PRIu64
1079 ", offset %" PRIu64 ", content_size %" PRIu64
1080 ", timestamp_end : %" PRIu64 "\n",
f1f52630
MD
1081 cur_index->packet_size, cur_index->offset,
1082 cur_index->content_size,
1083 cur_index->ts_cycles.timestamp_end);
4a744367
JD
1084
1085 /* update trace_packet_header and stream_packet_context */
1086 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1087 /* Read packet header */
1088 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1089 if (ret) {
1090 pos->offset = EOF;
1091 fprintf(stderr, "[error] trace packet header read failed\n");
1092 goto end;
1093 }
1094 }
1095 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1096 /* Read packet context */
1097 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1098 if (ret) {
1099 pos->offset = EOF;
1100 fprintf(stderr, "[error] stream packet context read failed\n");
1101 goto end;
1102 }
1103 }
1104 pos->data_offset = pos->offset;
1105
1106end:
1107 return;
1108}
1109
2acdc547
JD
1110int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1111{
1112 struct lttng_viewer_cmd cmd;
1113 struct lttng_viewer_create_session_response resp;
1114 int ret;
1115 ssize_t ret_len;
1116
1117 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1118 cmd.data_size = 0;
1119 cmd.cmd_version = 0;
1120
1121 do {
1122 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1123 } while (ret_len < 0 && errno == EINTR);
1124 if (ret_len < 0) {
79620497 1125 perror("[error] Error sending cmd");
2acdc547
JD
1126 ret = ret_len;
1127 goto error;
1128 }
1129 assert(ret_len == sizeof(cmd));
1130
1131 do {
1132 ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
1133 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
1134 if (ret_len == 0) {
1135 fprintf(stderr, "[error] Remote side has closed connection\n");
1136 ret = -1;
1137 goto error;
1138 }
2acdc547 1139 if (ret_len < 0) {
79620497 1140 perror("[error] Error receiving create session reply");
2acdc547
JD
1141 ret = ret_len;
1142 goto error;
1143 }
1144 assert(ret_len == sizeof(resp));
1145
1146 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1147 fprintf(stderr, "[error] Error creating viewer session\n");
1148 ret = -1;
1149 goto error;
1150 }
1151 ret = 0;
1152
1153error:
1154 return ret;
1155}
1156
1157static
1158int del_traces(gpointer key, gpointer value, gpointer user_data)
4a744367
JD
1159{
1160 struct bt_context *bt_ctx = user_data;
1161 struct lttng_live_ctf_trace *trace = value;
1162 int ret;
1163
1164 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1165 if (ret < 0)
1166 fprintf(stderr, "[error] removing trace from context\n");
1167
1168 /* remove the key/value pair from the HT. */
1169 return 1;
1170}
1171
2acdc547
JD
1172static
1173void add_traces(gpointer key, gpointer value, gpointer user_data)
4a744367 1174{
9abca178 1175 int i, ret;
4a744367
JD
1176 struct bt_context *bt_ctx = user_data;
1177 struct lttng_live_ctf_trace *trace = value;
1178 struct lttng_live_viewer_stream *stream;
1179 struct bt_mmap_stream *new_mmap_stream;
1180 struct bt_mmap_stream_list mmap_list;
1181 struct lttng_live_ctx *ctx = NULL;
1182
b5a1fa45
JD
1183 /*
1184 * We don't know how many streams we will receive for a trace, so
1185 * once we are done receiving the traces, we add all the traces
1186 * received to the bt_context.
1187 * We can receive streams during the attach command or the
1188 * get_new_streams, so we have to make sure not to add multiple
1189 * times the same traces.
1190 * If a trace is already in the context, we just skip this function.
1191 */
2acdc547
JD
1192 if (trace->in_use)
1193 return;
1194
4a744367
JD
1195 BT_INIT_LIST_HEAD(&mmap_list.head);
1196
1197 for (i = 0; i < trace->streams->len; i++) {
1198 stream = g_ptr_array_index(trace->streams, i);
1199 ctx = stream->session->ctx;
1200
1201 if (!stream->metadata_flag) {
1202 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1203 new_mmap_stream->priv = (void *) stream;
1204 new_mmap_stream->fd = -1;
1205 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1206 } else {
1207 /* Get all possible metadata before starting */
9abca178
JD
1208 ret = get_new_metadata(ctx, stream);
1209 if (ret)
1210 goto end_free;
4a744367
JD
1211 trace->metadata_fp = fopen(stream->path, "r");
1212 }
1213 }
1214
1215 if (!trace->metadata_fp) {
1216 fprintf(stderr, "[error] No metadata stream opened\n");
1217 goto end_free;
1218 }
1219
1220 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1221 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1222 if (ret < 0) {
1223 fprintf(stderr, "[error] Error adding trace\n");
1224 goto end_free;
1225 }
5ce3f408 1226
2acdc547 1227 if (bt_ctx->current_iterator) {
5ce3f408
MD
1228 struct bt_trace_descriptor *td;
1229 struct bt_trace_handle *handle;
1230
1231 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1232 bt_ctx->trace_handles,
1233 (gpointer) (unsigned long) ret);
1234 td = handle->td;
2acdc547
JD
1235 bt_iter_add_trace(bt_ctx->current_iterator, td);
1236 }
1237
4a744367 1238 trace->trace_id = ret;
2acdc547 1239 trace->in_use = 1;
4a744367
JD
1240
1241 goto end;
1242
1243end_free:
1244 bt_context_put(bt_ctx);
1245end:
1246 return;
1247}
1248
2acdc547 1249int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
6192b988
JD
1250{
1251 struct lttng_viewer_cmd cmd;
2acdc547
JD
1252 struct lttng_viewer_new_streams_request rq;
1253 struct lttng_viewer_new_streams_response rp;
1254 struct lttng_viewer_stream stream;
1255 int ret, i;
6192b988 1256 ssize_t ret_len;
b5a1fa45 1257 uint32_t stream_count;
6192b988 1258
2acdc547
JD
1259 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1260 cmd.data_size = sizeof(rq);
6192b988
JD
1261 cmd.cmd_version = 0;
1262
2acdc547
JD
1263 memset(&rq, 0, sizeof(rq));
1264 rq.session_id = htobe64(id);
1265
6192b988
JD
1266 do {
1267 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1268 } while (ret_len < 0 && errno == EINTR);
1269 if (ret_len < 0) {
79620497 1270 perror("[error] Error sending cmd");
6192b988
JD
1271 ret = ret_len;
1272 goto error;
1273 }
1274 assert(ret_len == sizeof(cmd));
1275
1276 do {
2acdc547 1277 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
6192b988
JD
1278 } while (ret_len < 0 && errno == EINTR);
1279 if (ret_len < 0) {
79620497 1280 perror("[error] Error sending get_new_streams request");
6192b988
JD
1281 ret = ret_len;
1282 goto error;
1283 }
2acdc547 1284 assert(ret_len == sizeof(rq));
6192b988 1285
2acdc547
JD
1286 do {
1287 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
1288 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
1289 if (ret_len == 0) {
1290 fprintf(stderr, "[error] Remote side has closed connection\n");
1291 ret = -1;
1292 goto error;
1293 }
2acdc547 1294 if (ret_len < 0) {
79620497 1295 perror("[error] Error receiving get_new_streams response");
2acdc547 1296 ret = ret_len;
6192b988
JD
1297 goto error;
1298 }
2acdc547
JD
1299 assert(ret_len == sizeof(rp));
1300
1301 switch(be32toh(rp.status)) {
1302 case LTTNG_VIEWER_NEW_STREAMS_OK:
1303 break;
1304 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1305 ret = 0;
1306 goto end;
1307 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1308 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1309 goto end;
1310 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1311 fprintf(stderr, "[error] get_new_streams error\n");
1312 ret = -1;
1313 goto end;
1314 default:
1315 fprintf(stderr, "[error] Unknown return code %u\n",
1316 be32toh(rp.status));
1317 ret = -1;
1318 goto end;
1319 }
1320
b5a1fa45
JD
1321 stream_count = be32toh(rp.streams_count);
1322 ctx->session->stream_count += stream_count;
2acdc547
JD
1323 /*
1324 * When the session is created but not started, we do an active wait
1325 * until it starts. It allows the viewer to start processing the trace
1326 * as soon as the session starts.
1327 */
1328 if (ctx->session->stream_count == 0) {
1329 ret = 0;
1330 goto end;
1331 }
1332 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1333 ctx->session->stream_count);
1334 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1335 ctx->session->stream_count);
b5a1fa45 1336 for (i = 0; i < stream_count; i++) {
2acdc547
JD
1337 do {
1338 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
1339 } while (ret_len < 0 && errno == EINTR);
f8612f31
MD
1340 if (ret_len == 0) {
1341 fprintf(stderr, "[error] Remote side has closed connection\n");
1342 ret = -1;
1343 goto error;
1344 }
2acdc547 1345 if (ret_len < 0) {
79620497 1346 perror("[error] Error receiving stream");
2acdc547
JD
1347 ret = ret_len;
1348 goto error;
1349 }
1350 assert(ret_len == sizeof(stream));
1351 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1352 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1353
1354 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1355 be64toh(stream.id), stream.path_name,
1356 stream.channel_name);
1357 ctx->session->streams[i].id = be64toh(stream.id);
1358 ctx->session->streams[i].session = ctx->session;
1359
1360 ctx->session->streams[i].first_read = 1;
1361 ctx->session->streams[i].mmap_size = 0;
1362
1363 if (be32toh(stream.metadata_flag)) {
1364 char *path;
1365
1366 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
1367 if (!path) {
1368 perror("strdup");
1369 ret = -1;
1370 goto error;
1371 }
1372 if (!mkdtemp(path)) {
1373 perror("mkdtemp");
1374 free(path);
1375 ret = -1;
1376 goto error;
1377 }
1378 ctx->session->streams[i].metadata_flag = 1;
1379 snprintf(ctx->session->streams[i].path,
1380 sizeof(ctx->session->streams[i].path),
1381 "%s/%s", path,
1382 stream.channel_name);
1383 ret = open(ctx->session->streams[i].path,
1384 O_WRONLY | O_CREAT | O_TRUNC,
1385 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1386 if (ret < 0) {
1387 perror("open");
1388 free(path);
1389 goto error;
1390 }
1391 ctx->session->streams[i].fd = ret;
1392 free(path);
1393 }
1394 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1395 be64toh(stream.ctf_trace_id));
1396 if (ret < 0) {
1397 goto error;
1398 }
1399
1400 }
6192b988
JD
1401 ret = 0;
1402
2acdc547 1403end:
6192b988
JD
1404error:
1405 return ret;
1406}
1407
2acdc547 1408void lttng_live_read(struct lttng_live_ctx *ctx)
4a744367 1409{
2acdc547 1410 int ret, i;
4a744367
JD
1411 struct bt_ctf_iter *iter;
1412 const struct bt_ctf_event *event;
1413 struct bt_iter_pos begin_pos;
1414 struct bt_trace_descriptor *td_write;
1415 struct bt_format *fmt_write;
1416 struct ctf_text_stream_pos *sout;
2acdc547 1417 uint64_t id;
4a744367 1418
2acdc547
JD
1419 ctx->bt_ctx = bt_context_create();
1420 if (!ctx->bt_ctx) {
4a744367
JD
1421 fprintf(stderr, "[error] bt_context_create allocation\n");
1422 goto end;
1423 }
1424
1425 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1426 if (!fmt_write) {
1427 fprintf(stderr, "[error] ctf-text error\n");
1428 goto end;
1429 }
1430
1431 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1432 if (!td_write) {
1433 fprintf(stderr, "[error] Error opening output trace\n");
1434 goto end_free;
1435 }
1436
1437 sout = container_of(td_write, struct ctf_text_stream_pos,
1438 trace_descriptor);
1439 if (!sout->parent.event_cb)
1440 goto end_free;
1441
6192b988
JD
1442 ret = lttng_live_create_viewer_session(ctx);
1443 if (ret < 0) {
1444 goto end_free;
1445 }
1446
2acdc547
JD
1447 for (i = 0; i < ctx->session_ids->len; i++) {
1448 id = g_array_index(ctx->session_ids, uint64_t, i);
1449 printf_verbose("Attaching to session %lu\n", id);
1450 ret = lttng_live_attach_session(ctx, id);
1451 printf_verbose("Attaching session returns %d\n", ret);
1452 if (ret < 0) {
1453 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1454 fprintf(stderr, "[error] Unknown session ID\n");
1455 }
1456 goto end_free;
1457 }
1458 }
1459
4a744367 1460 /*
2acdc547 1461 * As long as the session is active, we try to get new streams.
4a744367 1462 */
2acdc547 1463 for (;;) {
4a744367
JD
1464 int flags;
1465
2acdc547
JD
1466 while (!ctx->session->stream_count) {
1467 if (ctx->session_ids->len == 0)
4a744367 1468 goto end_free;
2acdc547
JD
1469 ret = ask_new_streams(ctx);
1470 if (ret < 0)
1471 goto end_free;
1472 }
4a744367 1473
2acdc547
JD
1474 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1475 ctx->bt_ctx);
4a744367
JD
1476
1477 begin_pos.type = BT_SEEK_BEGIN;
2acdc547 1478 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
4a744367
JD
1479 if (!iter) {
1480 fprintf(stderr, "[error] Iterator creation error\n");
1481 goto end;
1482 }
1483 for (;;) {
1484 event = bt_ctf_iter_read_event_flags(iter, &flags);
1485 if (!(flags & BT_ITER_FLAG_RETRY)) {
1486 if (!event) {
1487 /* End of trace */
1488 break;
1489 }
1490 ret = sout->parent.event_cb(&sout->parent,
1491 event->parent->stream);
1492 if (ret) {
1493 fprintf(stderr, "[error] Writing "
1494 "event failed.\n");
1495 goto end_free;
1496 }
1497 }
1498 ret = bt_iter_next(bt_ctf_get_iter(iter));
1499 if (ret < 0) {
1500 goto end_free;
1501 }
1502 }
1503 bt_ctf_iter_destroy(iter);
2acdc547
JD
1504 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1505 del_traces, ctx->bt_ctx);
1506 ctx->session->stream_count = 0;
1507 }
4a744367
JD
1508
1509end_free:
2acdc547 1510 bt_context_put(ctx->bt_ctx);
4a744367
JD
1511end:
1512 return;
1513}
This page took 0.088356 seconds and 4 git commands to generate.