Fix: lttng-live recv() and send() flags, partial recv()
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
CommitLineData
4a744367
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <netinet/in.h>
27#include <netdb.h>
28#include <stdio.h>
29#include <string.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <errno.h>
33#include <inttypes.h>
34#include <fcntl.h>
35#include <sys/mman.h>
36
37#include <babeltrace/ctf/ctf-index.h>
38
39#include <babeltrace/babeltrace.h>
40#include <babeltrace/ctf/events.h>
41#include <babeltrace/ctf/callbacks.h>
42#include <babeltrace/ctf/iterator.h>
43
44/* for packet_index */
45#include <babeltrace/ctf/types.h>
46
47#include <babeltrace/ctf/metadata.h>
48#include <babeltrace/ctf-text/types.h>
49#include <babeltrace/ctf/events-internal.h>
50#include <formats/ctf/events-private.h>
51
af701dc4
JD
52#include <babeltrace/compat/memstream.h>
53
c98627ca 54#include "lttng-live.h"
158eea6e 55#include "lttng-viewer-abi.h"
4a744367
JD
56
57/*
58 * Memory allocation zeroed
59 */
60#define zmalloc(x) calloc(1, x)
61
62#ifndef max_t
63#define max_t(type, a, b) \
64 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
65#endif
66
b5a1fa45
JD
67static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
68 size_t index, int whence);
2acdc547
JD
69static void add_traces(gpointer key, gpointer value, gpointer user_data);
70static int del_traces(gpointer key, gpointer value, gpointer user_data);
9abca178 71static int get_new_metadata(struct lttng_live_ctx *ctx,
af701dc4
JD
72 struct lttng_live_viewer_stream *viewer_stream,
73 char **metadata_buf);
2acdc547 74
b7e8a8e1
MD
75static
76ssize_t lttng_live_recv(int fd, void *buf, size_t len)
77{
78 ssize_t ret;
79 size_t copied = 0, to_copy = len;
80
81 do {
82 ret = recv(fd, buf + copied, to_copy, 0);
83 if (ret > 0) {
84 assert(ret <= to_copy);
85 copied += ret;
86 to_copy -= ret;
87 }
88 } while ((ret > 0 && to_copy > 0)
89 || (ret < 0 && errno == EINTR));
90 if (ret > 0)
91 ret = copied;
92 /* ret = 0 means orderly shutdown, ret < 0 is error. */
93 return ret;
94}
95
96static
97ssize_t lttng_live_send(int fd, const void *buf, size_t len)
98{
99 ssize_t ret;
100
101 do {
102 ret = send(fd, buf, len, MSG_NOSIGNAL);
103 } while (ret < 0 && errno == EINTR);
104 return ret;
105}
106
b5a1fa45 107int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
4a744367
JD
108{
109 struct hostent *host;
110 struct sockaddr_in server_addr;
111 int ret;
112
b5a1fa45 113 host = gethostbyname(ctx->relay_hostname);
4a744367
JD
114 if (!host) {
115 ret = -1;
116 goto end;
117 }
118
119 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
120 perror("Socket");
121 ret = -1;
122 goto end;
123 }
124
125 server_addr.sin_family = AF_INET;
b5a1fa45 126 server_addr.sin_port = htons(ctx->port);
4a744367
JD
127 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
128 bzero(&(server_addr.sin_zero), 8);
129
130 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
131 sizeof(struct sockaddr)) == -1) {
132 perror("Connect");
133 ret = -1;
134 goto end;
135 }
136
137 ret = 0;
138
139end:
140 return ret;
141}
142
143int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
144{
145 struct lttng_viewer_cmd cmd;
146 struct lttng_viewer_connect connect;
147 int ret;
148 ssize_t ret_len;
149
150 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
151 cmd.data_size = sizeof(connect);
152 cmd.cmd_version = 0;
153
0c3cd7e1 154 connect.viewer_session_id = -1ULL; /* will be set on recv */
4a744367
JD
155 connect.major = htobe32(LTTNG_LIVE_MAJOR);
156 connect.minor = htobe32(LTTNG_LIVE_MINOR);
157 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
158
b7e8a8e1 159 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
4a744367 160 if (ret_len < 0) {
79620497 161 perror("[error] Error sending cmd");
4a744367
JD
162 ret = ret_len;
163 goto error;
164 }
165 assert(ret_len == sizeof(cmd));
166
b7e8a8e1 167 ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect));
4a744367 168 if (ret_len < 0) {
79620497 169 perror("[error] Error sending version");
4a744367
JD
170 ret = ret_len;
171 goto error;
172 }
173 assert(ret_len == sizeof(connect));
174
b7e8a8e1 175 ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect));
f8612f31
MD
176 if (ret_len == 0) {
177 fprintf(stderr, "[error] Remote side has closed connection\n");
178 ret = -1;
179 goto error;
180 }
4a744367 181 if (ret_len < 0) {
79620497 182 perror("[error] Error receiving version");
4a744367
JD
183 ret = ret_len;
184 goto error;
185 }
186 assert(ret_len == sizeof(connect));
187
188 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
189 be64toh(connect.viewer_session_id));
190 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
191 be32toh(connect.minor));
192
193 ret = 0;
194
195error:
196 return ret;
197}
198
b5a1fa45
JD
199static
200void free_session_list(GPtrArray *session_list)
201{
202 int i;
203 struct lttng_live_relay_session *relay_session;
204
205 for (i = 0; i < session_list->len; i++) {
206 relay_session = g_ptr_array_index(session_list, i);
207 free(relay_session->name);
208 free(relay_session->hostname);
209 }
210 g_ptr_array_free(session_list, TRUE);
211}
212
213static
214void print_session_list(GPtrArray *session_list, const char *path)
215{
216 int i;
217 struct lttng_live_relay_session *relay_session;
218
219 for (i = 0; i < session_list->len; i++) {
220 relay_session = g_ptr_array_index(session_list, i);
221 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
222 "%u stream(s), %u client(s) connected)\n",
223 path, relay_session->hostname,
224 relay_session->name, relay_session->timer,
225 relay_session->streams, relay_session->clients);
226 }
227}
228
229static
230void update_session_list(GPtrArray *session_list, char *hostname,
231 char *session_name, uint32_t streams, uint32_t clients,
232 uint32_t timer)
233{
234 int i, found = 0;
235 struct lttng_live_relay_session *relay_session;
236
237 for (i = 0; i < session_list->len; i++) {
238 relay_session = g_ptr_array_index(session_list, i);
239 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
240 strncmp(relay_session->name,
241 session_name, NAME_MAX) == 0) {
242 relay_session->streams += streams;
243 if (relay_session->clients < clients)
244 relay_session->clients = clients;
245 found = 1;
246 break;
247 }
248 }
249 if (found)
250 return;
251
252 relay_session = g_new0(struct lttng_live_relay_session, 1);
253 relay_session->hostname = strndup(hostname, NAME_MAX);
254 relay_session->name = strndup(session_name, NAME_MAX);
255 relay_session->clients = clients;
256 relay_session->streams = streams;
257 relay_session->timer = timer;
258 g_ptr_array_add(session_list, relay_session);
259}
260
4a744367
JD
261int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
262{
263 struct lttng_viewer_cmd cmd;
264 struct lttng_viewer_list_sessions list;
265 struct lttng_viewer_session lsession;
b5a1fa45 266 int i, ret, sessions_count, print_list = 0;
4a744367 267 ssize_t ret_len;
b5a1fa45
JD
268 uint64_t session_id;
269 GPtrArray *session_list = NULL;
270
271 if (strlen(ctx->session_name) == 0) {
272 print_list = 1;
273 session_list = g_ptr_array_new();
274 }
4a744367
JD
275
276 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
277 cmd.data_size = 0;
278 cmd.cmd_version = 0;
279
b7e8a8e1 280 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
4a744367 281 if (ret_len < 0) {
79620497 282 perror("[error] Error sending cmd");
4a744367
JD
283 ret = ret_len;
284 goto error;
285 }
286 assert(ret_len == sizeof(cmd));
287
b7e8a8e1 288 ret_len = lttng_live_recv(ctx->control_sock, &list, sizeof(list));
f8612f31
MD
289 if (ret_len == 0) {
290 fprintf(stderr, "[error] Remote side has closed connection\n");
291 ret = -1;
292 goto error;
293 }
4a744367 294 if (ret_len < 0) {
79620497 295 perror("[error] Error receiving session list");
4a744367
JD
296 ret = ret_len;
297 goto error;
298 }
299 assert(ret_len == sizeof(list));
300
301 sessions_count = be32toh(list.sessions_count);
4a744367 302 for (i = 0; i < sessions_count; i++) {
b7e8a8e1 303 ret_len = lttng_live_recv(ctx->control_sock, &lsession, sizeof(lsession));
f8612f31
MD
304 if (ret_len == 0) {
305 fprintf(stderr, "[error] Remote side has closed connection\n");
306 ret = -1;
307 goto error;
308 }
4a744367 309 if (ret_len < 0) {
79620497 310 perror("[error] Error receiving session");
4a744367
JD
311 ret = ret_len;
312 goto error;
313 }
314 assert(ret_len == sizeof(lsession));
9ed31e53
MD
315 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
316 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
b5a1fa45
JD
317 session_id = be64toh(lsession.id);
318
319 if (print_list) {
320 update_session_list(session_list,
321 lsession.hostname,
322 lsession.session_name,
323 be32toh(lsession.streams),
324 be32toh(lsession.clients),
325 be32toh(lsession.live_timer));
326 } else {
327 if ((strncmp(lsession.session_name, ctx->session_name,
328 NAME_MAX) == 0) && (strncmp(lsession.hostname,
329 ctx->traced_hostname, NAME_MAX) == 0)) {
330 printf_verbose("Reading from session %" PRIu64 "\n",
331 session_id);
332 g_array_append_val(ctx->session_ids,
333 session_id);
334 }
335 }
336 }
4a744367 337
b5a1fa45
JD
338 if (print_list) {
339 print_session_list(session_list, path);
340 free_session_list(session_list);
4a744367
JD
341 }
342
343 ret = 0;
344
345error:
346 return ret;
347}
348
349int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
350 uint64_t ctf_trace_id)
351{
352 struct lttng_live_ctf_trace *trace;
353 int ret = 0;
354
355 trace = g_hash_table_lookup(stream->session->ctf_traces,
356 (gpointer) ctf_trace_id);
357 if (!trace) {
358 trace = g_new0(struct lttng_live_ctf_trace, 1);
4a744367
JD
359 trace->ctf_trace_id = ctf_trace_id;
360 trace->streams = g_ptr_array_new();
361 g_hash_table_insert(stream->session->ctf_traces,
362 (gpointer) ctf_trace_id,
363 trace);
364 }
365 if (stream->metadata_flag)
366 trace->metadata_stream = stream;
367
368 stream->ctf_trace = trace;
369 g_ptr_array_add(trace->streams, stream);
370
4a744367
JD
371 return ret;
372}
373
af701dc4
JD
374static
375int open_metadata_fp_write(struct lttng_live_viewer_stream *stream,
376 char **metadata_buf, size_t *size)
377{
378 int ret = 0;
379
380 stream->metadata_fp_write =
381 babeltrace_open_memstream(metadata_buf, size);
382 if (!stream->metadata_fp_write) {
383 perror("Metadata open_memstream");
384 ret = -1;
385 }
386
387 return ret;
388}
389
4a744367
JD
390int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
391{
392 struct lttng_viewer_cmd cmd;
393 struct lttng_viewer_attach_session_request rq;
394 struct lttng_viewer_attach_session_response rp;
395 struct lttng_viewer_stream stream;
396 int ret, i;
397 ssize_t ret_len;
398
399 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
400 cmd.data_size = sizeof(rq);
401 cmd.cmd_version = 0;
402
403 memset(&rq, 0, sizeof(rq));
404 rq.session_id = htobe64(id);
405 // TODO: add cmd line parameter to select seek beginning
406 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
407 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
408
b7e8a8e1 409 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
4a744367 410 if (ret_len < 0) {
79620497 411 perror("[error] Error sending cmd");
4a744367
JD
412 ret = ret_len;
413 goto error;
414 }
415 assert(ret_len == sizeof(cmd));
416
b7e8a8e1 417 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
4a744367 418 if (ret_len < 0) {
79620497 419 perror("[error] Error sending attach request");
4a744367
JD
420 ret = ret_len;
421 goto error;
422 }
423 assert(ret_len == sizeof(rq));
424
b7e8a8e1 425 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
f8612f31
MD
426 if (ret_len == 0) {
427 fprintf(stderr, "[error] Remote side has closed connection\n");
428 ret = -1;
429 goto error;
430 }
4a744367 431 if (ret_len < 0) {
79620497 432 perror("[error] Error receiving attach response");
4a744367
JD
433 ret = ret_len;
434 goto error;
435 }
436 assert(ret_len == sizeof(rp));
437
438 switch(be32toh(rp.status)) {
439 case LTTNG_VIEWER_ATTACH_OK:
440 break;
441 case LTTNG_VIEWER_ATTACH_UNK:
442 ret = -LTTNG_VIEWER_ATTACH_UNK;
443 goto end;
444 case LTTNG_VIEWER_ATTACH_ALREADY:
79620497 445 fprintf(stderr, "[error] There is already a viewer attached to this session\n");
4a744367
JD
446 ret = -1;
447 goto end;
448 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
449 fprintf(stderr, "[error] Not a live session\n");
450 ret = -1;
451 goto end;
452 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
453 fprintf(stderr, "[error] Wrong seek parameter\n");
454 ret = -1;
455 goto end;
456 default:
457 fprintf(stderr, "[error] Unknown attach return code %u\n",
458 be32toh(rp.status));
459 ret = -1;
460 goto end;
461 }
462 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
463 ret = -1;
464 goto end;
465 }
466
2acdc547 467 ctx->session->stream_count += be32toh(rp.streams_count);
4a744367
JD
468 /*
469 * When the session is created but not started, we do an active wait
470 * until it starts. It allows the viewer to start processing the trace
471 * as soon as the session starts.
472 */
473 if (ctx->session->stream_count == 0) {
474 ret = 0;
475 goto end;
476 }
477 printf_verbose("Waiting for %" PRIu64 " streams:\n",
478 ctx->session->stream_count);
479 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
480 ctx->session->stream_count);
4a744367 481 for (i = 0; i < be32toh(rp.streams_count); i++) {
b7e8a8e1 482 ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
f8612f31
MD
483 if (ret_len == 0) {
484 fprintf(stderr, "[error] Remote side has closed connection\n");
485 ret = -1;
486 goto error;
487 }
4a744367 488 if (ret_len < 0) {
79620497 489 perror("[error] Error receiving stream");
4a744367
JD
490 ret = ret_len;
491 goto error;
492 }
493 assert(ret_len == sizeof(stream));
9ed31e53
MD
494 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
495 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
4a744367
JD
496
497 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
498 be64toh(stream.id), stream.path_name,
499 stream.channel_name);
500 ctx->session->streams[i].id = be64toh(stream.id);
501 ctx->session->streams[i].session = ctx->session;
502
503 ctx->session->streams[i].first_read = 1;
504 ctx->session->streams[i].mmap_size = 0;
505
506 if (be32toh(stream.metadata_flag)) {
4a744367 507 ctx->session->streams[i].metadata_flag = 1;
4a744367
JD
508 }
509 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
510 be64toh(stream.ctf_trace_id));
511 if (ret < 0) {
512 goto error;
513 }
514
515 }
516 ret = 0;
517
518end:
519error:
520 return ret;
521}
522
2acdc547
JD
523static
524int ask_new_streams(struct lttng_live_ctx *ctx)
525{
526 int i, ret = 0;
527 uint64_t id;
528
529restart:
530 for (i = 0; i < ctx->session_ids->len; i++) {
531 id = g_array_index(ctx->session_ids, uint64_t, i);
532 ret = lttng_live_get_new_streams(ctx, id);
533 printf_verbose("Asking for new streams returns %d\n",
534 ret);
535 if (ret < 0) {
536 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
537 printf_verbose("Session %" PRIu64 " closed\n",
538 id);
b5a1fa45
JD
539 /*
540 * The streams have already been closed during
541 * the reading, so we only need to get rid of
542 * the trace in our internal table of sessions.
543 */
2acdc547
JD
544 g_array_remove_index(ctx->session_ids, i);
545 /*
546 * We can't continue iterating on the g_array
547 * after a remove, we have to start again.
548 */
549 goto restart;
550 } else {
551 ret = -1;
552 goto end;
553 }
554 }
555 }
556
557end:
558 return ret;
559}
560
50d207af
JD
561static
562int append_metadata(struct lttng_live_ctx *ctx,
563 struct lttng_live_viewer_stream *viewer_stream)
564{
565 int ret;
566 struct lttng_live_viewer_stream *metadata;
567 char *metadata_buf = NULL;
568
569 printf_verbose("get_next_index: new metadata needed\n");
570 ret = get_new_metadata(ctx, viewer_stream, &metadata_buf);
571 if (ret < 0) {
572 free(metadata_buf);
573 goto error;
574 }
575
576 metadata = viewer_stream->ctf_trace->metadata_stream;
577 metadata->ctf_trace->metadata_fp =
578 babeltrace_fmemopen(metadata_buf,
579 metadata->metadata_len, "rb");
580 if (!metadata->ctf_trace->metadata_fp) {
581 perror("Metadata fmemopen");
582 free(metadata_buf);
583 ret = -1;
584 goto error;
585 }
586 ret = ctf_append_trace_metadata(
587 viewer_stream->ctf_trace->handle->td,
588 metadata->ctf_trace->metadata_fp);
589 if (ret != 0) {
590 fprintf(stderr, "[error] Appending metadata\n");
591 goto error;
592 }
593
594error:
595 return ret;
596}
597
4a744367
JD
598static
599int get_data_packet(struct lttng_live_ctx *ctx,
600 struct ctf_stream_pos *pos,
601 struct lttng_live_viewer_stream *stream, uint64_t offset,
602 uint64_t len)
603{
604 struct lttng_viewer_cmd cmd;
605 struct lttng_viewer_get_packet rq;
606 struct lttng_viewer_trace_packet rp;
607 ssize_t ret_len;
608 int ret;
609
50d207af 610retry:
4a744367
JD
611 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
612 cmd.data_size = sizeof(rq);
613 cmd.cmd_version = 0;
614
615 memset(&rq, 0, sizeof(rq));
616 rq.stream_id = htobe64(stream->id);
617 /* Already in big endian. */
618 rq.offset = offset;
619 rq.len = htobe32(len);
620
b7e8a8e1 621 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
4a744367 622 if (ret_len < 0) {
79620497 623 perror("[error] Error sending cmd");
4a744367
JD
624 ret = ret_len;
625 goto error;
626 }
627 assert(ret_len == sizeof(cmd));
628
b7e8a8e1 629 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
4a744367 630 if (ret_len < 0) {
79620497 631 perror("[error] Error sending get_data_packet request");
4a744367
JD
632 ret = ret_len;
633 goto error;
634 }
635 assert(ret_len == sizeof(rq));
636
b7e8a8e1 637 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
f8612f31
MD
638 if (ret_len == 0) {
639 fprintf(stderr, "[error] Remote side has closed connection\n");
640 ret = -1;
641 goto error;
642 }
4a744367 643 if (ret_len < 0) {
79620497 644 perror("[error] Error receiving data response");
4a744367
JD
645 ret = ret_len;
646 goto error;
647 }
57b9d843
JD
648 if (ret_len != sizeof(rp)) {
649 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
650 ", received %" PRId64 "\n", ret_len,
651 sizeof(rp));
652 ret = -1;
653 goto error;
654 }
4a744367
JD
655
656 rp.flags = be32toh(rp.flags);
657
658 switch (be32toh(rp.status)) {
659 case LTTNG_VIEWER_GET_PACKET_OK:
660 len = be32toh(rp.len);
661 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
662 "\n", len);
663 break;
664 case LTTNG_VIEWER_GET_PACKET_RETRY:
665 printf_verbose("get_data_packet: retry\n");
666 ret = -1;
667 goto end;
668 case LTTNG_VIEWER_GET_PACKET_ERR:
669 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
670 printf_verbose("get_data_packet: new metadata needed\n");
50d207af
JD
671 ret = append_metadata(ctx, stream);
672 if (ret)
673 goto error;
674 goto retry;
4a744367 675 }
2acdc547
JD
676 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
677 ret = ask_new_streams(ctx);
678 if (ret < 0)
679 goto error;
680 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
681 ctx->bt_ctx);
682 }
4a744367
JD
683 fprintf(stderr, "[error] get_data_packet: error\n");
684 ret = -1;
685 goto end;
686 case LTTNG_VIEWER_GET_PACKET_EOF:
687 ret = -2;
688 goto error;
689 default:
690 printf_verbose("get_data_packet: unknown\n");
691 assert(0);
692 ret = -1;
693 goto end;
694 }
695
696 if (len <= 0) {
697 ret = -1;
698 goto end;
699 }
700
701 if (len > stream->mmap_size) {
702 uint64_t new_size;
703
704 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
705 if (pos->base_mma) {
706 /* unmap old base */
707 ret = munmap_align(pos->base_mma);
708 if (ret) {
79620497 709 perror("[error] Unable to unmap old base");
4a744367
JD
710 ret = -1;
711 goto error;
712 }
713 pos->base_mma = NULL;
714 }
715 pos->base_mma = mmap_align(new_size,
716 PROT_READ | PROT_WRITE,
717 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
718 if (pos->base_mma == MAP_FAILED) {
79620497 719 perror("[error] mmap error");
4a744367
JD
720 pos->base_mma = NULL;
721 ret = -1;
722 goto error;
723 }
724
725 stream->mmap_size = new_size;
726 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
727 stream->mmap_size);
728 }
729
b7e8a8e1
MD
730 ret_len = lttng_live_recv(ctx->control_sock,
731 mmap_align_addr(pos->base_mma), len);
f8612f31
MD
732 if (ret_len == 0) {
733 fprintf(stderr, "[error] Remote side has closed connection\n");
734 ret = -1;
735 goto error;
736 }
4a744367 737 if (ret_len < 0) {
79620497 738 perror("[error] Error receiving trace packet");
4a744367
JD
739 ret = ret_len;
740 goto error;
741 }
742 assert(ret_len == len);
743
744end:
745error:
746 return ret;
747}
748
4a744367 749static
9abca178
JD
750int get_one_metadata_packet(struct lttng_live_ctx *ctx,
751 struct lttng_live_viewer_stream *metadata_stream)
4a744367
JD
752{
753 uint64_t len = 0;
754 int ret;
755 struct lttng_viewer_cmd cmd;
756 struct lttng_viewer_get_metadata rq;
757 struct lttng_viewer_metadata_packet rp;
4a744367
JD
758 char *data = NULL;
759 ssize_t ret_len;
760
9abca178 761 rq.stream_id = htobe64(metadata_stream->id);
4a744367
JD
762 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
763 cmd.data_size = sizeof(rq);
764 cmd.cmd_version = 0;
765
b7e8a8e1 766 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
4a744367 767 if (ret_len < 0) {
79620497 768 perror("[error] Error sending cmd");
4a744367
JD
769 ret = ret_len;
770 goto error;
771 }
772 assert(ret_len == sizeof(cmd));
773
b7e8a8e1 774 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
4a744367 775 if (ret_len < 0) {
79620497 776 perror("[error] Error sending get_metadata request");
4a744367
JD
777 ret = ret_len;
778 goto error;
779 }
780 assert(ret_len == sizeof(rq));
781
b7e8a8e1 782 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
f8612f31
MD
783 if (ret_len == 0) {
784 fprintf(stderr, "[error] Remote side has closed connection\n");
785 ret = -1;
786 goto error;
787 }
4a744367 788 if (ret_len < 0) {
79620497 789 perror("[error] Error receiving metadata response");
4a744367
JD
790 ret = ret_len;
791 goto error;
792 }
793 assert(ret_len == sizeof(rp));
794
795 switch (be32toh(rp.status)) {
796 case LTTNG_VIEWER_METADATA_OK:
797 printf_verbose("get_metadata : OK\n");
798 break;
799 case LTTNG_VIEWER_NO_NEW_METADATA:
800 printf_verbose("get_metadata : NO NEW\n");
9abca178 801 ret = 0;
4a744367
JD
802 goto end;
803 case LTTNG_VIEWER_METADATA_ERR:
804 printf_verbose("get_metadata : ERR\n");
805 ret = -1;
806 goto end;
807 default:
808 printf_verbose("get_metadata : UNKNOWN\n");
809 ret = -1;
810 goto end;
811 }
812
813 len = be64toh(rp.len);
814 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
815 if (len <= 0) {
816 ret = -1;
817 goto end;
818 }
819
820 data = zmalloc(len);
821 if (!data) {
822 perror("relay data zmalloc");
823 ret = -1;
824 goto error;
825 }
b7e8a8e1 826 ret_len = lttng_live_recv(ctx->control_sock, data, len);
f8612f31
MD
827 if (ret_len == 0) {
828 fprintf(stderr, "[error] Remote side has closed connection\n");
829 ret = -1;
830 free(data);
831 goto error;
832 }
4a744367 833 if (ret_len < 0) {
79620497 834 perror("[error] Error receiving trace packet");
4a744367
JD
835 ret = ret_len;
836 free(data);
837 goto error;
838 }
839 assert(ret_len == len);
840
841 do {
af701dc4
JD
842 ret_len = fwrite(data, 1, len,
843 metadata_stream->metadata_fp_write);
4a744367
JD
844 } while (ret_len < 0 && errno == EINTR);
845 if (ret_len < 0) {
af701dc4 846 fprintf(stderr, "[error] Writing in the metadata fp\n");
4a744367
JD
847 free(data);
848 ret = ret_len;
849 goto error;
850 }
851 assert(ret_len == len);
af701dc4 852 metadata_stream->metadata_len += len;
9abca178 853 ret = len;
4a744367
JD
854
855 free(data);
856
4a744367
JD
857end:
858error:
859 return ret;
860}
861
9abca178
JD
862/*
863 * Return 0 on success, a negative value on error.
864 */
865static
866int get_new_metadata(struct lttng_live_ctx *ctx,
af701dc4
JD
867 struct lttng_live_viewer_stream *viewer_stream,
868 char **metadata_buf)
9abca178
JD
869{
870 int ret = 0;
871 struct lttng_live_viewer_stream *metadata_stream;
af701dc4 872 size_t size;
9abca178
JD
873
874 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
50d207af
JD
875 if (!metadata_stream) {
876 fprintf(stderr, "[error] No metadata stream\n");
877 ret = -1;
878 goto error;
879 }
af701dc4
JD
880 metadata_stream->metadata_len = 0;
881 ret = open_metadata_fp_write(metadata_stream, metadata_buf, &size);
882 if (ret < 0) {
883 goto error;
884 }
9abca178
JD
885
886 do {
887 /*
888 * get_one_metadata_packet returns the number of bytes
889 * received, 0 when we have received everything, a
890 * negative value on error.
891 */
892 ret = get_one_metadata_packet(ctx, metadata_stream);
893 } while (ret > 0);
894
7def295f
MD
895 if (fclose(metadata_stream->metadata_fp_write))
896 perror("fclose");
af701dc4
JD
897 metadata_stream->metadata_fp_write = NULL;
898
899error:
9abca178
JD
900 return ret;
901}
902
4a744367
JD
903/*
904 * Get one index for a stream.
905 *
906 * Returns 0 on success or a negative value on error.
907 */
908static
909int get_next_index(struct lttng_live_ctx *ctx,
910 struct lttng_live_viewer_stream *viewer_stream,
911 struct packet_index *index)
912{
913 struct lttng_viewer_cmd cmd;
914 struct lttng_viewer_get_next_index rq;
915 struct lttng_viewer_index rp;
916 int ret;
4a744367
JD
917 ssize_t ret_len;
918
919 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
920 cmd.data_size = sizeof(rq);
921 cmd.cmd_version = 0;
922
923 memset(&rq, 0, sizeof(rq));
924 rq.stream_id = htobe64(viewer_stream->id);
925
926retry:
b7e8a8e1 927 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
4a744367 928 if (ret_len < 0) {
79620497 929 perror("[error] Error sending cmd");
4a744367
JD
930 ret = ret_len;
931 goto error;
932 }
933 assert(ret_len == sizeof(cmd));
934
b7e8a8e1 935 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
4a744367 936 if (ret_len < 0) {
79620497 937 perror("[error] Error sending get_next_index request");
4a744367
JD
938 ret = ret_len;
939 goto error;
940 }
941 assert(ret_len == sizeof(rq));
942
b7e8a8e1 943 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
f8612f31
MD
944 if (ret_len == 0) {
945 fprintf(stderr, "[error] Remote side has closed connection\n");
946 ret = -1;
947 goto error;
948 }
4a744367 949 if (ret_len < 0) {
79620497 950 perror("[error] Error receiving index response");
4a744367
JD
951 ret = ret_len;
952 goto error;
953 }
954 assert(ret_len == sizeof(rp));
955
956 rp.flags = be32toh(rp.flags);
957
958 switch (be32toh(rp.status)) {
959 case LTTNG_VIEWER_INDEX_INACTIVE:
960 printf_verbose("get_next_index: inactive\n");
961 memset(index, 0, sizeof(struct packet_index));
992e8cc0 962 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
4a744367
JD
963 break;
964 case LTTNG_VIEWER_INDEX_OK:
965 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
966 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
967 index->offset = be64toh(rp.offset);
968 index->packet_size = be64toh(rp.packet_size);
969 index->content_size = be64toh(rp.content_size);
992e8cc0
MD
970 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
971 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
4a744367
JD
972 index->events_discarded = be64toh(rp.events_discarded);
973
974 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
50d207af
JD
975 ret = append_metadata(ctx, viewer_stream);
976 if (ret)
4a744367 977 goto error;
4a744367 978 }
2acdc547
JD
979 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
980 ret = ask_new_streams(ctx);
981 if (ret < 0)
982 goto error;
983 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
984 ctx->bt_ctx);
985 }
4a744367
JD
986 break;
987 case LTTNG_VIEWER_INDEX_RETRY:
988 printf_verbose("get_next_index: retry\n");
989 sleep(1);
990 goto retry;
991 case LTTNG_VIEWER_INDEX_HUP:
992 printf_verbose("get_next_index: stream hung up\n");
993 viewer_stream->id = -1ULL;
4a744367 994 index->offset = EOF;
2acdc547 995 ctx->session->stream_count--;
4a744367
JD
996 break;
997 case LTTNG_VIEWER_INDEX_ERR:
998 fprintf(stderr, "[error] get_next_index: error\n");
999 ret = -1;
1000 goto error;
1001 default:
1002 fprintf(stderr, "[error] get_next_index: unkwown value\n");
1003 ret = -1;
1004 goto error;
1005 }
1006
1007 ret = 0;
1008
1009error:
1010 return ret;
1011}
1012
b5a1fa45 1013static
4a744367
JD
1014void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
1015 int whence)
1016{
1017 struct ctf_stream_pos *pos;
1018 struct ctf_file_stream *file_stream;
f1f52630 1019 struct packet_index *prev_index = NULL, *cur_index;
4a744367
JD
1020 struct lttng_live_viewer_stream *viewer_stream;
1021 struct lttng_live_session *session;
1022 int ret;
1023
1024retry:
1025 pos = ctf_pos(stream_pos);
1026 file_stream = container_of(pos, struct ctf_file_stream, pos);
1027 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
1028 session = viewer_stream->session;
1029
f1f52630
MD
1030 switch (pos->packet_index->len) {
1031 case 0:
1032 g_array_set_size(pos->packet_index, 1);
1033 cur_index = &g_array_index(pos->packet_index,
1034 struct packet_index, 0);
1035 break;
1036 case 1:
1037 g_array_set_size(pos->packet_index, 2);
1038 prev_index = &g_array_index(pos->packet_index,
1039 struct packet_index, 0);
1040 cur_index = &g_array_index(pos->packet_index,
1041 struct packet_index, 1);
1042 break;
1043 case 2:
1044 g_array_index(pos->packet_index,
1045 struct packet_index, 0) =
1046 g_array_index(pos->packet_index,
1047 struct packet_index, 1);
1048 prev_index = &g_array_index(pos->packet_index,
1049 struct packet_index, 0);
1050 cur_index = &g_array_index(pos->packet_index,
1051 struct packet_index, 1);
1052 break;
1053 default:
1054 abort();
1055 break;
1056 }
4a744367 1057 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
f1f52630 1058 ret = get_next_index(session->ctx, viewer_stream, cur_index);
4a744367
JD
1059 if (ret < 0) {
1060 pos->offset = EOF;
1061 fprintf(stderr, "[error] get_next_index failed\n");
1062 return;
1063 }
1064
f1f52630
MD
1065 pos->packet_size = cur_index->packet_size;
1066 pos->content_size = cur_index->content_size;
4a744367 1067 pos->mmap_base_offset = 0;
f1f52630 1068 if (cur_index->offset == EOF) {
4a744367
JD
1069 pos->offset = EOF;
1070 } else {
1071 pos->offset = 0;
1072 }
1073
f1f52630
MD
1074 if (cur_index->content_size == 0) {
1075 file_stream->parent.cycles_timestamp =
1076 cur_index->ts_cycles.timestamp_end;
4a744367 1077 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
f1f52630
MD
1078 &file_stream->parent,
1079 cur_index->ts_cycles.timestamp_end);
4a744367 1080 } else {
f1f52630
MD
1081 /* Convert the timestamps and append to the real_index. */
1082 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
1083 &file_stream->parent,
1084 cur_index->ts_cycles.timestamp_begin);
1085 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
1086 &file_stream->parent,
1087 cur_index->ts_cycles.timestamp_end);
1088
1089 ctf_update_current_packet_index(&file_stream->parent,
1090 prev_index, cur_index);
1091
1092 file_stream->parent.cycles_timestamp =
1093 cur_index->ts_cycles.timestamp_begin;
1094 file_stream->parent.real_timestamp =
1095 cur_index->ts_real.timestamp_begin;
4a744367
JD
1096 }
1097
1098 if (pos->packet_size == 0 || pos->offset == EOF) {
1099 goto end;
1100 }
1101
1102 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1103 viewer_stream->id);
1104 ret = get_data_packet(session->ctx, pos, viewer_stream,
f1f52630
MD
1105 be64toh(cur_index->offset),
1106 cur_index->packet_size / CHAR_BIT);
4a744367
JD
1107 if (ret == -2) {
1108 goto retry;
1109 } else if (ret < 0) {
1110 pos->offset = EOF;
1111 fprintf(stderr, "[error] get_data_packet failed\n");
1112 return;
1113 }
1114
1115 printf_verbose("Index received : packet_size : %" PRIu64
1116 ", offset %" PRIu64 ", content_size %" PRIu64
1117 ", timestamp_end : %" PRIu64 "\n",
f1f52630
MD
1118 cur_index->packet_size, cur_index->offset,
1119 cur_index->content_size,
1120 cur_index->ts_cycles.timestamp_end);
4a744367
JD
1121
1122 /* update trace_packet_header and stream_packet_context */
1123 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1124 /* Read packet header */
1125 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1126 if (ret) {
1127 pos->offset = EOF;
1128 fprintf(stderr, "[error] trace packet header read failed\n");
1129 goto end;
1130 }
1131 }
1132 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1133 /* Read packet context */
1134 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1135 if (ret) {
1136 pos->offset = EOF;
1137 fprintf(stderr, "[error] stream packet context read failed\n");
1138 goto end;
1139 }
1140 }
1141 pos->data_offset = pos->offset;
1142
1143end:
1144 return;
1145}
1146
2acdc547
JD
1147int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1148{
1149 struct lttng_viewer_cmd cmd;
1150 struct lttng_viewer_create_session_response resp;
1151 int ret;
1152 ssize_t ret_len;
1153
1154 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1155 cmd.data_size = 0;
1156 cmd.cmd_version = 0;
1157
b7e8a8e1 1158 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
2acdc547 1159 if (ret_len < 0) {
79620497 1160 perror("[error] Error sending cmd");
2acdc547
JD
1161 ret = ret_len;
1162 goto error;
1163 }
1164 assert(ret_len == sizeof(cmd));
1165
b7e8a8e1 1166 ret_len = lttng_live_recv(ctx->control_sock, &resp, sizeof(resp));
f8612f31
MD
1167 if (ret_len == 0) {
1168 fprintf(stderr, "[error] Remote side has closed connection\n");
1169 ret = -1;
1170 goto error;
1171 }
2acdc547 1172 if (ret_len < 0) {
79620497 1173 perror("[error] Error receiving create session reply");
2acdc547
JD
1174 ret = ret_len;
1175 goto error;
1176 }
1177 assert(ret_len == sizeof(resp));
1178
1179 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1180 fprintf(stderr, "[error] Error creating viewer session\n");
1181 ret = -1;
1182 goto error;
1183 }
1184 ret = 0;
1185
1186error:
1187 return ret;
1188}
1189
1190static
1191int del_traces(gpointer key, gpointer value, gpointer user_data)
4a744367
JD
1192{
1193 struct bt_context *bt_ctx = user_data;
1194 struct lttng_live_ctf_trace *trace = value;
1195 int ret;
1196
1197 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1198 if (ret < 0)
1199 fprintf(stderr, "[error] removing trace from context\n");
1200
1201 /* remove the key/value pair from the HT. */
1202 return 1;
1203}
1204
2acdc547
JD
1205static
1206void add_traces(gpointer key, gpointer value, gpointer user_data)
4a744367 1207{
9abca178 1208 int i, ret;
4a744367
JD
1209 struct bt_context *bt_ctx = user_data;
1210 struct lttng_live_ctf_trace *trace = value;
1211 struct lttng_live_viewer_stream *stream;
1212 struct bt_mmap_stream *new_mmap_stream;
1213 struct bt_mmap_stream_list mmap_list;
1214 struct lttng_live_ctx *ctx = NULL;
150595c4
JD
1215 struct bt_trace_descriptor *td;
1216 struct bt_trace_handle *handle;
4a744367 1217
b5a1fa45
JD
1218 /*
1219 * We don't know how many streams we will receive for a trace, so
1220 * once we are done receiving the traces, we add all the traces
1221 * received to the bt_context.
1222 * We can receive streams during the attach command or the
1223 * get_new_streams, so we have to make sure not to add multiple
1224 * times the same traces.
1225 * If a trace is already in the context, we just skip this function.
1226 */
2acdc547
JD
1227 if (trace->in_use)
1228 return;
1229
4a744367
JD
1230 BT_INIT_LIST_HEAD(&mmap_list.head);
1231
1232 for (i = 0; i < trace->streams->len; i++) {
1233 stream = g_ptr_array_index(trace->streams, i);
1234 ctx = stream->session->ctx;
1235
1236 if (!stream->metadata_flag) {
1237 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1238 new_mmap_stream->priv = (void *) stream;
1239 new_mmap_stream->fd = -1;
1240 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1241 } else {
af701dc4
JD
1242 char *metadata_buf = NULL;
1243
4a744367 1244 /* Get all possible metadata before starting */
af701dc4
JD
1245 ret = get_new_metadata(ctx, stream, &metadata_buf);
1246 if (ret) {
1247 free(metadata_buf);
1248 goto end_free;
1249 }
1250 if (!stream->metadata_len) {
1251 fprintf(stderr, "[error] empty metadata\n");
1252 ret = -1;
1253 free(metadata_buf);
1254 goto end_free;
1255 }
1256
1257 trace->metadata_fp = babeltrace_fmemopen(metadata_buf,
1258 stream->metadata_len, "rb");
1259 if (!trace->metadata_fp) {
1260 perror("Metadata fmemopen");
1261 ret = -1;
1262 free(metadata_buf);
9abca178 1263 goto end_free;
af701dc4 1264 }
4a744367
JD
1265 }
1266 }
1267
1268 if (!trace->metadata_fp) {
1269 fprintf(stderr, "[error] No metadata stream opened\n");
1270 goto end_free;
1271 }
1272
1273 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1274 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1275 if (ret < 0) {
1276 fprintf(stderr, "[error] Error adding trace\n");
1277 goto end_free;
1278 }
af701dc4
JD
1279 trace->metadata_stream->metadata_len = 0;
1280
150595c4
JD
1281 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1282 bt_ctx->trace_handles,
1283 (gpointer) (unsigned long) ret);
1284 td = handle->td;
1285 trace->handle = handle;
2acdc547 1286 if (bt_ctx->current_iterator) {
2acdc547
JD
1287 bt_iter_add_trace(bt_ctx->current_iterator, td);
1288 }
1289
4a744367 1290 trace->trace_id = ret;
2acdc547 1291 trace->in_use = 1;
4a744367
JD
1292
1293 goto end;
1294
1295end_free:
1296 bt_context_put(bt_ctx);
1297end:
1298 return;
1299}
1300
2acdc547 1301int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
6192b988
JD
1302{
1303 struct lttng_viewer_cmd cmd;
2acdc547
JD
1304 struct lttng_viewer_new_streams_request rq;
1305 struct lttng_viewer_new_streams_response rp;
1306 struct lttng_viewer_stream stream;
1307 int ret, i;
6192b988 1308 ssize_t ret_len;
b5a1fa45 1309 uint32_t stream_count;
6192b988 1310
2acdc547
JD
1311 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1312 cmd.data_size = sizeof(rq);
6192b988
JD
1313 cmd.cmd_version = 0;
1314
2acdc547
JD
1315 memset(&rq, 0, sizeof(rq));
1316 rq.session_id = htobe64(id);
1317
b7e8a8e1 1318 ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
6192b988 1319 if (ret_len < 0) {
79620497 1320 perror("[error] Error sending cmd");
6192b988
JD
1321 ret = ret_len;
1322 goto error;
1323 }
1324 assert(ret_len == sizeof(cmd));
1325
b7e8a8e1 1326 ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
6192b988 1327 if (ret_len < 0) {
79620497 1328 perror("[error] Error sending get_new_streams request");
6192b988
JD
1329 ret = ret_len;
1330 goto error;
1331 }
2acdc547 1332 assert(ret_len == sizeof(rq));
6192b988 1333
b7e8a8e1 1334 ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
f8612f31
MD
1335 if (ret_len == 0) {
1336 fprintf(stderr, "[error] Remote side has closed connection\n");
1337 ret = -1;
1338 goto error;
1339 }
2acdc547 1340 if (ret_len < 0) {
79620497 1341 perror("[error] Error receiving get_new_streams response");
2acdc547 1342 ret = ret_len;
6192b988
JD
1343 goto error;
1344 }
2acdc547
JD
1345 assert(ret_len == sizeof(rp));
1346
1347 switch(be32toh(rp.status)) {
1348 case LTTNG_VIEWER_NEW_STREAMS_OK:
1349 break;
1350 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1351 ret = 0;
1352 goto end;
1353 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1354 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1355 goto end;
1356 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1357 fprintf(stderr, "[error] get_new_streams error\n");
1358 ret = -1;
1359 goto end;
1360 default:
1361 fprintf(stderr, "[error] Unknown return code %u\n",
1362 be32toh(rp.status));
1363 ret = -1;
1364 goto end;
1365 }
1366
b5a1fa45
JD
1367 stream_count = be32toh(rp.streams_count);
1368 ctx->session->stream_count += stream_count;
2acdc547
JD
1369 /*
1370 * When the session is created but not started, we do an active wait
1371 * until it starts. It allows the viewer to start processing the trace
1372 * as soon as the session starts.
1373 */
1374 if (ctx->session->stream_count == 0) {
1375 ret = 0;
1376 goto end;
1377 }
1378 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1379 ctx->session->stream_count);
1380 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1381 ctx->session->stream_count);
b5a1fa45 1382 for (i = 0; i < stream_count; i++) {
b7e8a8e1 1383 ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
f8612f31
MD
1384 if (ret_len == 0) {
1385 fprintf(stderr, "[error] Remote side has closed connection\n");
1386 ret = -1;
1387 goto error;
1388 }
2acdc547 1389 if (ret_len < 0) {
79620497 1390 perror("[error] Error receiving stream");
2acdc547
JD
1391 ret = ret_len;
1392 goto error;
1393 }
1394 assert(ret_len == sizeof(stream));
1395 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1396 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1397
1398 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1399 be64toh(stream.id), stream.path_name,
1400 stream.channel_name);
1401 ctx->session->streams[i].id = be64toh(stream.id);
1402 ctx->session->streams[i].session = ctx->session;
1403
1404 ctx->session->streams[i].first_read = 1;
1405 ctx->session->streams[i].mmap_size = 0;
1406
1407 if (be32toh(stream.metadata_flag)) {
2acdc547 1408 ctx->session->streams[i].metadata_flag = 1;
2acdc547
JD
1409 }
1410 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1411 be64toh(stream.ctf_trace_id));
1412 if (ret < 0) {
1413 goto error;
1414 }
1415
1416 }
6192b988
JD
1417 ret = 0;
1418
2acdc547 1419end:
6192b988
JD
1420error:
1421 return ret;
1422}
1423
2acdc547 1424void lttng_live_read(struct lttng_live_ctx *ctx)
4a744367 1425{
2acdc547 1426 int ret, i;
4a744367
JD
1427 struct bt_ctf_iter *iter;
1428 const struct bt_ctf_event *event;
1429 struct bt_iter_pos begin_pos;
1430 struct bt_trace_descriptor *td_write;
1431 struct bt_format *fmt_write;
1432 struct ctf_text_stream_pos *sout;
2acdc547 1433 uint64_t id;
4a744367 1434
2acdc547
JD
1435 ctx->bt_ctx = bt_context_create();
1436 if (!ctx->bt_ctx) {
4a744367
JD
1437 fprintf(stderr, "[error] bt_context_create allocation\n");
1438 goto end;
1439 }
1440
1441 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1442 if (!fmt_write) {
1443 fprintf(stderr, "[error] ctf-text error\n");
1444 goto end;
1445 }
1446
1447 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1448 if (!td_write) {
1449 fprintf(stderr, "[error] Error opening output trace\n");
1450 goto end_free;
1451 }
1452
1453 sout = container_of(td_write, struct ctf_text_stream_pos,
1454 trace_descriptor);
1455 if (!sout->parent.event_cb)
1456 goto end_free;
1457
6192b988
JD
1458 ret = lttng_live_create_viewer_session(ctx);
1459 if (ret < 0) {
1460 goto end_free;
1461 }
1462
2acdc547
JD
1463 for (i = 0; i < ctx->session_ids->len; i++) {
1464 id = g_array_index(ctx->session_ids, uint64_t, i);
1465 printf_verbose("Attaching to session %lu\n", id);
1466 ret = lttng_live_attach_session(ctx, id);
1467 printf_verbose("Attaching session returns %d\n", ret);
1468 if (ret < 0) {
1469 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1470 fprintf(stderr, "[error] Unknown session ID\n");
1471 }
1472 goto end_free;
1473 }
1474 }
1475
4a744367 1476 /*
2acdc547 1477 * As long as the session is active, we try to get new streams.
4a744367 1478 */
2acdc547 1479 for (;;) {
4a744367
JD
1480 int flags;
1481
2acdc547
JD
1482 while (!ctx->session->stream_count) {
1483 if (ctx->session_ids->len == 0)
4a744367 1484 goto end_free;
2acdc547
JD
1485 ret = ask_new_streams(ctx);
1486 if (ret < 0)
1487 goto end_free;
1488 }
4a744367 1489
2acdc547
JD
1490 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1491 ctx->bt_ctx);
4a744367
JD
1492
1493 begin_pos.type = BT_SEEK_BEGIN;
2acdc547 1494 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
4a744367
JD
1495 if (!iter) {
1496 fprintf(stderr, "[error] Iterator creation error\n");
1497 goto end;
1498 }
1499 for (;;) {
1500 event = bt_ctf_iter_read_event_flags(iter, &flags);
1501 if (!(flags & BT_ITER_FLAG_RETRY)) {
1502 if (!event) {
1503 /* End of trace */
1504 break;
1505 }
1506 ret = sout->parent.event_cb(&sout->parent,
1507 event->parent->stream);
1508 if (ret) {
1509 fprintf(stderr, "[error] Writing "
1510 "event failed.\n");
1511 goto end_free;
1512 }
1513 }
1514 ret = bt_iter_next(bt_ctf_get_iter(iter));
1515 if (ret < 0) {
1516 goto end_free;
1517 }
1518 }
1519 bt_ctf_iter_destroy(iter);
2acdc547
JD
1520 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1521 del_traces, ctx->bt_ctx);
1522 ctx->session->stream_count = 0;
1523 }
4a744367
JD
1524
1525end_free:
2acdc547 1526 bt_context_put(ctx->bt_ctx);
4a744367
JD
1527end:
1528 return;
1529}
This page took 0.089199 seconds and 4 git commands to generate.