SoW-2019-0002: Dynamic Snapshot
[lttng-tools.git] / tests / regression / tools / live / live_test.c
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include <assert.h>
9 #include <errno.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <unistd.h>
14 #include <common/compat/time.h>
15 #include <sys/types.h>
16 #include <inttypes.h>
17 #include <stdlib.h>
18 #include <sys/socket.h>
19 #include <netinet/in.h>
20 #include <netdb.h>
21 #include <fcntl.h>
22 #include <sys/mman.h>
23 #include <sys/stat.h>
24
25 #include <tap/tap.h>
26 #include <lttng/lttng.h>
27
28 #include <urcu/list.h>
29 #include <common/common.h>
30
31 #include <bin/lttng-relayd/lttng-viewer-abi.h>
32 #include <common/index/ctf-index.h>
33
34 #include <common/compat/endian.h>
35
36 #define SESSION1 "test1"
37 #define RELAYD_URL "net://localhost"
38 #define LIVE_TIMER 2000000
39
40 /* Number of TAP tests in this file */
41 #define NUM_TESTS 11
42 #define mmap_size 524288
43
44 static int control_sock;
45 struct live_session *session;
46
47 static int first_packet_offset;
48 static int first_packet_len;
49 static int first_packet_stream_id = -1;
50
51 struct viewer_stream {
52 uint64_t id;
53 uint64_t ctf_trace_id;
54 void *mmap_base;
55 int fd;
56 int metadata_flag;
57 int first_read;
58 char path[PATH_MAX];
59 };
60
61 struct live_session {
62 struct viewer_stream *streams;
63 uint64_t live_timer_interval;
64 uint64_t stream_count;
65 };
66
67 static
68 ssize_t lttng_live_recv(int fd, void *buf, size_t len)
69 {
70 ssize_t ret;
71 size_t copied = 0, to_copy = len;
72
73 do {
74 ret = recv(fd, buf + copied, to_copy, 0);
75 if (ret > 0) {
76 assert(ret <= to_copy);
77 copied += ret;
78 to_copy -= ret;
79 }
80 } while ((ret > 0 && to_copy > 0)
81 || (ret < 0 && errno == EINTR));
82 if (ret > 0)
83 ret = copied;
84 /* ret = 0 means orderly shutdown, ret < 0 is error. */
85 return ret;
86 }
87
88 static
89 ssize_t lttng_live_send(int fd, const void *buf, size_t len)
90 {
91 ssize_t ret;
92
93 do {
94 ret = send(fd, buf, len, MSG_NOSIGNAL);
95 } while (ret < 0 && errno == EINTR);
96 return ret;
97 }
98
99 static
100 int connect_viewer(const char *hostname)
101 {
102 struct hostent *host;
103 struct sockaddr_in server_addr;
104 int ret;
105
106 host = gethostbyname(hostname);
107 if (!host) {
108 ret = -1;
109 goto end;
110 }
111
112 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
113 PERROR("Socket");
114 ret = -1;
115 goto end;
116 }
117
118 server_addr.sin_family = AF_INET;
119 server_addr.sin_port = htons(5344);
120 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
121 bzero(&(server_addr.sin_zero), 8);
122
123 if (connect(control_sock, (struct sockaddr *) &server_addr,
124 sizeof(struct sockaddr)) == -1) {
125 PERROR("Connect");
126 ret = -1;
127 goto end;
128 }
129
130 server_addr.sin_family = AF_INET;
131 server_addr.sin_port = htons(5345);
132 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
133 bzero(&(server_addr.sin_zero), 8);
134
135 ret = 0;
136
137 end:
138 return ret;
139 }
140
141 static int establish_connection(void)
142 {
143 struct lttng_viewer_cmd cmd;
144 struct lttng_viewer_connect connect;
145 ssize_t ret_len;
146
147 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
148 cmd.data_size = htobe64(sizeof(connect));
149 cmd.cmd_version = htobe32(0);
150
151 memset(&connect, 0, sizeof(connect));
152 connect.major = htobe32(VERSION_MAJOR);
153 connect.minor = htobe32(VERSION_MINOR);
154 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
155
156 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
157 if (ret_len < 0) {
158 diag("Error sending cmd");
159 goto error;
160 }
161 ret_len = lttng_live_send(control_sock, &connect, sizeof(connect));
162 if (ret_len < 0) {
163 diag("Error sending version");
164 goto error;
165 }
166
167 ret_len = lttng_live_recv(control_sock, &connect, sizeof(connect));
168 if (ret_len == 0) {
169 diag("[error] Remote side has closed connection");
170 goto error;
171 }
172 if (ret_len < 0) {
173 diag("Error receiving version");
174 goto error;
175 }
176 return 0;
177
178 error:
179 return -1;
180 }
181
182 /*
183 * Returns the number of sessions, should be 1 during the unit test.
184 */
185 static int list_sessions(uint64_t *session_id)
186 {
187 struct lttng_viewer_cmd cmd;
188 struct lttng_viewer_list_sessions list;
189 struct lttng_viewer_session lsession;
190 int i;
191 ssize_t ret_len;
192 int first_session = 0;
193
194 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
195 cmd.data_size = htobe64(0);
196 cmd.cmd_version = htobe32(0);
197
198 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
199 if (ret_len < 0) {
200 diag("Error sending cmd");
201 goto error;
202 }
203
204 ret_len = lttng_live_recv(control_sock, &list, sizeof(list));
205 if (ret_len == 0) {
206 diag("[error] Remote side has closed connection");
207 goto error;
208 }
209 if (ret_len < 0) {
210 diag("Error receiving session list");
211 goto error;
212 }
213
214 for (i = 0; i < be32toh(list.sessions_count); i++) {
215 ret_len = lttng_live_recv(control_sock, &lsession, sizeof(lsession));
216 if (ret_len < 0) {
217 diag("Error receiving session");
218 goto error;
219 }
220 if (lsession.streams > 0 && first_session <= 0) {
221 first_session = be64toh(lsession.id);
222 *session_id = first_session;
223 }
224 }
225
226 return be32toh(list.sessions_count);
227
228 error:
229 return -1;
230 }
231
232 static int create_viewer_session(void)
233 {
234 struct lttng_viewer_cmd cmd;
235 struct lttng_viewer_create_session_response resp;
236 ssize_t ret_len;
237
238 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
239 cmd.data_size = htobe64(0);
240 cmd.cmd_version = htobe32(0);
241
242 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
243 if (ret_len < 0) {
244 diag("[error] Error sending cmd");
245 goto error;
246 }
247 assert(ret_len == sizeof(cmd));
248
249 ret_len = lttng_live_recv(control_sock, &resp, sizeof(resp));
250 if (ret_len == 0) {
251 diag("[error] Remote side has closed connection");
252 goto error;
253 }
254 if (ret_len < 0) {
255 diag("[error] Error receiving create session reply");
256 goto error;
257 }
258 assert(ret_len == sizeof(resp));
259
260 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
261 diag("[error] Error creating viewer session");
262 goto error;
263 }
264 return 0;
265
266 error:
267 return -1;
268 }
269
270 static int attach_session(uint64_t id)
271 {
272 struct lttng_viewer_cmd cmd;
273 struct lttng_viewer_attach_session_request rq;
274 struct lttng_viewer_attach_session_response rp;
275 struct lttng_viewer_stream stream;
276 int i;
277 ssize_t ret_len;
278
279 session = zmalloc(sizeof(struct live_session));
280 if (!session) {
281 goto error;
282 }
283
284 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
285 cmd.data_size = htobe64(sizeof(rq));
286 cmd.cmd_version = htobe32(0);
287
288 memset(&rq, 0, sizeof(rq));
289 rq.session_id = htobe64(id);
290 rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
291
292 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
293 if (ret_len < 0) {
294 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
295 goto error;
296 }
297 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
298 if (ret_len < 0) {
299 diag("Error sending attach request");
300 goto error;
301 }
302
303 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
304 if (ret_len == 0) {
305 diag("[error] Remote side has closed connection");
306 goto error;
307 }
308 if (ret_len < 0) {
309 diag("Error receiving attach response");
310 goto error;
311 }
312 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
313 goto error;
314 }
315
316 session->stream_count = be32toh(rp.streams_count);
317 if (session->stream_count == 0) {
318 diag("Got session stream count == 0");
319 goto error;
320 }
321 session->streams = zmalloc(session->stream_count *
322 sizeof(struct viewer_stream));
323 if (!session->streams) {
324 goto error;
325 }
326
327 for (i = 0; i < be32toh(rp.streams_count); i++) {
328 ret_len = lttng_live_recv(control_sock, &stream, sizeof(stream));
329 if (ret_len == 0) {
330 diag("[error] Remote side has closed connection");
331 goto error;
332 }
333 if (ret_len < 0) {
334 diag("Error receiving stream");
335 goto error;
336 }
337 session->streams[i].id = be64toh(stream.id);
338
339 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
340 session->streams[i].first_read = 1;
341 session->streams[i].mmap_base = mmap(NULL, mmap_size,
342 PROT_READ | PROT_WRITE,
343 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
344 if (session->streams[i].mmap_base == MAP_FAILED) {
345 diag("mmap error");
346 goto error;
347 }
348
349 if (be32toh(stream.metadata_flag)) {
350 session->streams[i].metadata_flag = 1;
351 }
352 }
353 return session->stream_count;
354
355 error:
356 return -1;
357 }
358
359 static int get_metadata(void)
360 {
361 struct lttng_viewer_cmd cmd;
362 struct lttng_viewer_get_metadata rq;
363 struct lttng_viewer_metadata_packet rp;
364 ssize_t ret_len;
365 int ret;
366 uint64_t i;
367 char *data = NULL;
368 uint64_t len = 0;
369 int metadata_stream_id = -1;
370
371 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
372 cmd.data_size = htobe64(sizeof(rq));
373 cmd.cmd_version = htobe32(0);
374
375 for (i = 0; i < session->stream_count; i++) {
376 if (session->streams[i].metadata_flag) {
377 metadata_stream_id = i;
378 break;
379 }
380 }
381
382 if (metadata_stream_id < 0) {
383 diag("No metadata stream found");
384 goto error;
385 }
386
387 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
388
389 retry:
390 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
391 if (ret_len < 0) {
392 diag("Error sending cmd");
393 goto error;
394 }
395 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
396 if (ret_len < 0) {
397 diag("Error sending get_metadata request");
398 goto error;
399 }
400 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
401 if (ret_len == 0) {
402 diag("[error] Remote side has closed connection");
403 goto error;
404 }
405 if (ret_len < 0) {
406 diag("Error receiving metadata response");
407 goto error;
408 }
409 switch (be32toh(rp.status)) {
410 case LTTNG_VIEWER_METADATA_OK:
411 break;
412 case LTTNG_VIEWER_NO_NEW_METADATA:
413 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
414 usleep(50);
415 goto retry;
416 case LTTNG_VIEWER_METADATA_ERR:
417 diag("Got LTTNG_VIEWER_METADATA_ERR:");
418 goto error;
419 default:
420 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
421 goto error;
422 }
423
424 len = be64toh(rp.len);
425 if (len <= 0) {
426 goto error;
427 }
428
429 data = zmalloc(len);
430 if (!data) {
431 PERROR("relay data zmalloc");
432 goto error;
433 }
434 ret_len = lttng_live_recv(control_sock, data, len);
435 if (ret_len == 0) {
436 diag("[error] Remote side has closed connection");
437 goto error_free_data;
438 }
439 if (ret_len < 0) {
440 diag("Error receiving trace packet");
441 goto error_free_data;
442 }
443 free(data);
444 ret = len;
445
446 return ret;
447
448 error_free_data:
449 free(data);
450 error:
451 return -1;
452 }
453
454 static int get_next_index(void)
455 {
456 struct lttng_viewer_cmd cmd;
457 struct lttng_viewer_get_next_index rq;
458 struct lttng_viewer_index rp;
459 ssize_t ret_len;
460 int id;
461
462 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
463 cmd.data_size = htobe64(sizeof(rq));
464 cmd.cmd_version = htobe32(0);
465
466 for (id = 0; id < session->stream_count; id++) {
467 if (session->streams[id].metadata_flag) {
468 continue;
469 }
470 memset(&rq, 0, sizeof(rq));
471 rq.stream_id = htobe64(session->streams[id].id);
472
473 retry:
474 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
475 if (ret_len < 0) {
476 diag("Error sending cmd");
477 goto error;
478 }
479 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
480 if (ret_len < 0) {
481 diag("Error sending get_next_index request");
482 goto error;
483 }
484 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
485 if (ret_len == 0) {
486 diag("[error] Remote side has closed connection");
487 goto error;
488 }
489 if (ret_len < 0) {
490 diag("Error receiving index response");
491 goto error;
492 }
493
494 rp.flags = be32toh(rp.flags);
495
496 switch (be32toh(rp.status)) {
497 case LTTNG_VIEWER_INDEX_INACTIVE:
498 /* Skip this stream. */
499 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
500 continue;
501 case LTTNG_VIEWER_INDEX_OK:
502 break;
503 case LTTNG_VIEWER_INDEX_RETRY:
504 sleep(1);
505 goto retry;
506 case LTTNG_VIEWER_INDEX_HUP:
507 diag("Got LTTNG_VIEWER_INDEX_HUP");
508 session->streams[id].id = -1ULL;
509 session->streams[id].fd = -1;
510 goto error;
511 case LTTNG_VIEWER_INDEX_ERR:
512 diag("Got LTTNG_VIEWER_INDEX_ERR");
513 goto error;
514 default:
515 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)", be32toh(rp.status));
516 goto error;
517 }
518 if (first_packet_stream_id < 0) {
519 /*
520 * Initialize the first packet stream id. That is,
521 * the first active stream encoutered.
522 */
523 first_packet_offset = be64toh(rp.offset);
524 first_packet_len = be64toh(rp.packet_size) / CHAR_BIT;
525 first_packet_stream_id = id;
526 diag("Got first packet index with offset %d and len %d",
527 first_packet_offset, first_packet_len);
528 }
529 }
530 return 0;
531
532 error:
533 return -1;
534 }
535
536 static
537 int get_data_packet(int id, uint64_t offset,
538 uint64_t len)
539 {
540 struct lttng_viewer_cmd cmd;
541 struct lttng_viewer_get_packet rq;
542 struct lttng_viewer_trace_packet rp;
543 ssize_t ret_len;
544
545 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
546 cmd.data_size = htobe64(sizeof(rq));
547 cmd.cmd_version = htobe32(0);
548
549 memset(&rq, 0, sizeof(rq));
550 rq.stream_id = htobe64(session->streams[id].id);
551 /* Already in big endian. */
552 rq.offset = offset;
553 rq.len = htobe32(len);
554
555 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
556 if (ret_len < 0) {
557 diag("Error sending cmd");
558 goto error;
559 }
560 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
561 if (ret_len < 0) {
562 diag("Error sending get_data_packet request");
563 goto error;
564 }
565 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
566 if (ret_len == 0) {
567 diag("[error] Remote side has closed connection");
568 goto error;
569 }
570 if (ret_len < 0) {
571 diag("Error receiving data response");
572 goto error;
573 }
574 rp.flags = be32toh(rp.flags);
575
576 switch (be32toh(rp.status)) {
577 case LTTNG_VIEWER_GET_PACKET_OK:
578 len = be32toh(rp.len);
579 if (len == 0) {
580 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
581 goto error;
582 }
583 break;
584 case LTTNG_VIEWER_GET_PACKET_RETRY:
585 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
586 goto error;
587 case LTTNG_VIEWER_GET_PACKET_ERR:
588 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
589 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
590 goto end;
591 }
592 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
593 goto error;
594 default:
595 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
596 goto error;
597 }
598
599 if (len > mmap_size) {
600 diag("mmap_size not big enough");
601 goto error;
602 }
603
604 ret_len = lttng_live_recv(control_sock, session->streams[id].mmap_base, len);
605 if (ret_len == 0) {
606 diag("[error] Remote side has closed connection");
607 goto error;
608 }
609 if (ret_len < 0) {
610 diag("Error receiving trace packet");
611 goto error;
612 }
613 end:
614 return 0;
615 error:
616 return -1;
617 }
618
619 static int detach_viewer_session(uint64_t id)
620 {
621 struct lttng_viewer_cmd cmd;
622 struct lttng_viewer_detach_session_response resp;
623 struct lttng_viewer_detach_session_request rq;
624 int ret;
625 ssize_t ret_len;
626
627 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
628 cmd.data_size = htobe64(sizeof(rq));
629 cmd.cmd_version = htobe32(0);
630
631 memset(&rq, 0, sizeof(rq));
632 rq.session_id = htobe64(id);
633
634 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
635 if (ret_len < 0) {
636 fprintf(stderr, "[error] Error sending cmd\n");
637 ret = ret_len;
638 goto error;
639 }
640
641 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
642 if (ret_len < 0) {
643 fprintf(stderr, "Error sending attach request\n");
644 ret = ret_len;
645 goto error;
646 }
647
648 ret_len = lttng_live_recv(control_sock, &resp, sizeof(resp));
649 if (ret_len < 0) {
650 fprintf(stderr, "[error] Error receiving detach session reply\n");
651 ret = ret_len;
652 goto error;
653 }
654
655 if (be32toh(resp.status) != LTTNG_VIEWER_DETACH_SESSION_OK) {
656 fprintf(stderr, "[error] Error detaching viewer session\n");
657 ret = -1;
658 goto error;
659 }
660 ret = 0;
661
662 error:
663 return ret;
664 }
665
666 int main(int argc, char **argv)
667 {
668 int ret;
669 uint64_t session_id;
670
671 plan_tests(NUM_TESTS);
672
673 diag("Live unit tests");
674
675 ret = connect_viewer("localhost");
676 ok(ret == 0, "Connect viewer to relayd");
677
678 ret = establish_connection();
679 ok(ret == 0, "Established connection and version check with %d.%d",
680 VERSION_MAJOR, VERSION_MINOR);
681
682 ret = list_sessions(&session_id);
683 ok(ret > 0, "List sessions : %d session(s)", ret);
684 if (ret < 0) {
685 goto end;
686 }
687
688 ret = create_viewer_session();
689 ok(ret == 0, "Create viewer session");
690
691 ret = attach_session(session_id);
692 ok(ret > 0, "Attach to session, %d stream(s) received", ret);
693
694 ret = get_metadata();
695 ok(ret > 0, "Get metadata, received %d bytes", ret);
696
697 ret = get_next_index();
698 ok(ret == 0, "Get one index per stream");
699
700 ret = get_data_packet(first_packet_stream_id, first_packet_offset,
701 first_packet_len);
702 ok(ret == 0,
703 "Get one data packet for stream %d, offset %d, len %d",
704 first_packet_stream_id, first_packet_offset,
705 first_packet_len);
706
707 ret = detach_viewer_session(session_id);
708 ok(ret == 0, "Detach viewer session");
709
710 ret = list_sessions(&session_id);
711 ok(ret > 0, "List sessions : %d session(s)", ret);
712
713 ret = attach_session(session_id);
714 ok(ret > 0, "Attach to session, %d streams received", ret);
715 end:
716 return exit_status();
717 }
This page took 0.045975 seconds and 5 git commands to generate.