Move to kernel style SPDX license identifiers
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-only
5 *
6 */
7
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
17 #include <assert.h>
18 #include "lttng-ctl-helper.h"
19 #include <common/compat/poll.h>
20
21 static
22 int handshake(struct lttng_notification_channel *channel);
23
24 /*
25 * Populates the reception buffer with the next complete message.
26 * The caller must acquire the channel's lock.
27 */
28 static
29 int receive_message(struct lttng_notification_channel *channel)
30 {
31 ssize_t ret;
32 struct lttng_notification_channel_message msg;
33
34 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
35 ret = -1;
36 goto end;
37 }
38
39 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
40 if (ret <= 0) {
41 ret = -1;
42 goto error;
43 }
44
45 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
46 ret = -1;
47 goto error;
48 }
49
50 /* Add message header at buffer's start. */
51 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
52 sizeof(msg));
53 if (ret) {
54 goto error;
55 }
56
57 /* Reserve space for the payload. */
58 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
59 channel->reception_buffer.size + msg.size);
60 if (ret) {
61 goto error;
62 }
63
64 /* Receive message payload. */
65 ret = lttcomm_recv_unix_sock(channel->socket,
66 channel->reception_buffer.data + sizeof(msg), msg.size);
67 if (ret < (ssize_t) msg.size) {
68 ret = -1;
69 goto error;
70 }
71 ret = 0;
72 end:
73 return ret;
74 error:
75 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
76 ret = -1;
77 }
78 goto end;
79 }
80
81 static
82 enum lttng_notification_channel_message_type get_current_message_type(
83 struct lttng_notification_channel *channel)
84 {
85 struct lttng_notification_channel_message *msg;
86
87 assert(channel->reception_buffer.size >= sizeof(*msg));
88
89 msg = (struct lttng_notification_channel_message *)
90 channel->reception_buffer.data;
91 return (enum lttng_notification_channel_message_type) msg->type;
92 }
93
94 static
95 struct lttng_notification *create_notification_from_current_message(
96 struct lttng_notification_channel *channel)
97 {
98 ssize_t ret;
99 struct lttng_notification *notification = NULL;
100 struct lttng_buffer_view view;
101
102 if (channel->reception_buffer.size <=
103 sizeof(struct lttng_notification_channel_message)) {
104 goto end;
105 }
106
107 view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
108 sizeof(struct lttng_notification_channel_message), -1);
109
110 ret = lttng_notification_create_from_buffer(&view, &notification);
111 if (ret != channel->reception_buffer.size -
112 sizeof(struct lttng_notification_channel_message)) {
113 lttng_notification_destroy(notification);
114 notification = NULL;
115 goto end;
116 }
117 end:
118 return notification;
119 }
120
121 struct lttng_notification_channel *lttng_notification_channel_create(
122 struct lttng_endpoint *endpoint)
123 {
124 int fd, ret;
125 bool is_in_tracing_group = false, is_root = false;
126 char *sock_path = NULL;
127 struct lttng_notification_channel *channel = NULL;
128
129 if (!endpoint ||
130 endpoint != lttng_session_daemon_notification_endpoint) {
131 goto end;
132 }
133
134 sock_path = zmalloc(LTTNG_PATH_MAX);
135 if (!sock_path) {
136 goto end;
137 }
138
139 channel = zmalloc(sizeof(struct lttng_notification_channel));
140 if (!channel) {
141 goto end;
142 }
143 channel->socket = -1;
144 pthread_mutex_init(&channel->lock, NULL);
145 lttng_dynamic_buffer_init(&channel->reception_buffer);
146 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
147
148 is_root = (getuid() == 0);
149 if (!is_root) {
150 is_in_tracing_group = lttng_check_tracing_group();
151 }
152
153 if (is_root || is_in_tracing_group) {
154 lttng_ctl_copy_string(sock_path,
155 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
156 LTTNG_PATH_MAX);
157 ret = lttcomm_connect_unix_sock(sock_path);
158 if (ret >= 0) {
159 fd = ret;
160 goto set_fd;
161 }
162 }
163
164 /* Fallback to local session daemon. */
165 ret = snprintf(sock_path, LTTNG_PATH_MAX,
166 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
167 utils_get_home_dir());
168 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
169 goto error;
170 }
171
172 ret = lttcomm_connect_unix_sock(sock_path);
173 if (ret < 0) {
174 goto error;
175 }
176 fd = ret;
177
178 set_fd:
179 channel->socket = fd;
180
181 ret = handshake(channel);
182 if (ret) {
183 goto error;
184 }
185 end:
186 free(sock_path);
187 return channel;
188 error:
189 lttng_notification_channel_destroy(channel);
190 channel = NULL;
191 goto end;
192 }
193
194 enum lttng_notification_channel_status
195 lttng_notification_channel_get_next_notification(
196 struct lttng_notification_channel *channel,
197 struct lttng_notification **_notification)
198 {
199 int ret;
200 struct lttng_notification *notification = NULL;
201 enum lttng_notification_channel_status status =
202 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
203 struct lttng_poll_event events;
204
205 if (!channel || !_notification) {
206 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
207 goto end;
208 }
209
210 pthread_mutex_lock(&channel->lock);
211
212 if (channel->pending_notifications.count) {
213 struct pending_notification *pending_notification;
214
215 assert(!cds_list_empty(&channel->pending_notifications.list));
216
217 /* Deliver one of the pending notifications. */
218 pending_notification = cds_list_first_entry(
219 &channel->pending_notifications.list,
220 struct pending_notification,
221 node);
222 notification = pending_notification->notification;
223 if (!notification) {
224 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
225 }
226 cds_list_del(&pending_notification->node);
227 channel->pending_notifications.count--;
228 free(pending_notification);
229 goto end_unlock;
230 }
231
232 /*
233 * Block on interruptible epoll/poll() instead of the message reception
234 * itself as the recvmsg() wrappers always restart on EINTR. We choose
235 * to wait using interruptible epoll/poll() in order to:
236 * 1) Return if a signal occurs,
237 * 2) Not deal with partially received messages.
238 *
239 * The drawback to this approach is that we assume that messages
240 * are complete/well formed. If a message is shorter than its
241 * announced length, receive_message() will block on recvmsg()
242 * and never return (even if a signal is received).
243 */
244 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
245 if (ret < 0) {
246 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
247 goto end_unlock;
248 }
249 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
250 if (ret < 0) {
251 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
252 goto end_clean_poll;
253 }
254 ret = lttng_poll_wait_interruptible(&events, -1);
255 if (ret <= 0) {
256 status = (ret == -1 && errno == EINTR) ?
257 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
258 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
259 goto end_clean_poll;
260 }
261
262 ret = receive_message(channel);
263 if (ret) {
264 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
265 goto end_clean_poll;
266 }
267
268 switch (get_current_message_type(channel)) {
269 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
270 notification = create_notification_from_current_message(
271 channel);
272 if (!notification) {
273 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
274 goto end_clean_poll;
275 }
276 break;
277 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
278 /* No payload to consume. */
279 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
280 break;
281 default:
282 /* Protocol error. */
283 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
284 goto end_clean_poll;
285 }
286
287 end_clean_poll:
288 lttng_poll_clean(&events);
289 end_unlock:
290 pthread_mutex_unlock(&channel->lock);
291 *_notification = notification;
292 end:
293 return status;
294 }
295
296 static
297 int enqueue_dropped_notification(
298 struct lttng_notification_channel *channel)
299 {
300 int ret = 0;
301 struct pending_notification *pending_notification;
302 struct cds_list_head *last_element =
303 channel->pending_notifications.list.prev;
304
305 pending_notification = caa_container_of(last_element,
306 struct pending_notification, node);
307 if (!pending_notification->notification) {
308 /*
309 * The last enqueued notification indicates dropped
310 * notifications; there is nothing to do as we group
311 * dropped notifications together.
312 */
313 goto end;
314 }
315
316 if (channel->pending_notifications.count >=
317 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
318 pending_notification->notification) {
319 /*
320 * Discard the last enqueued notification to indicate
321 * that notifications were dropped at this point.
322 */
323 lttng_notification_destroy(
324 pending_notification->notification);
325 pending_notification->notification = NULL;
326 goto end;
327 }
328
329 pending_notification = zmalloc(sizeof(*pending_notification));
330 if (!pending_notification) {
331 ret = -1;
332 goto end;
333 }
334 CDS_INIT_LIST_HEAD(&pending_notification->node);
335 cds_list_add(&pending_notification->node,
336 &channel->pending_notifications.list);
337 channel->pending_notifications.count++;
338 end:
339 return ret;
340 }
341
342 static
343 int enqueue_notification_from_current_message(
344 struct lttng_notification_channel *channel)
345 {
346 int ret = 0;
347 struct lttng_notification *notification;
348 struct pending_notification *pending_notification;
349
350 if (channel->pending_notifications.count >=
351 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
352 /* Drop the notification. */
353 ret = enqueue_dropped_notification(channel);
354 goto end;
355 }
356
357 pending_notification = zmalloc(sizeof(*pending_notification));
358 if (!pending_notification) {
359 ret = -1;
360 goto error;
361 }
362 CDS_INIT_LIST_HEAD(&pending_notification->node);
363
364 notification = create_notification_from_current_message(channel);
365 if (!notification) {
366 ret = -1;
367 goto error;
368 }
369
370 pending_notification->notification = notification;
371 cds_list_add(&pending_notification->node,
372 &channel->pending_notifications.list);
373 channel->pending_notifications.count++;
374 end:
375 return ret;
376 error:
377 free(pending_notification);
378 goto end;
379 }
380
381 enum lttng_notification_channel_status
382 lttng_notification_channel_has_pending_notification(
383 struct lttng_notification_channel *channel,
384 bool *_notification_pending)
385 {
386 int ret;
387 enum lttng_notification_channel_status status =
388 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
389 struct lttng_poll_event events;
390
391 if (!channel || !_notification_pending) {
392 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
393 goto end;
394 }
395
396 pthread_mutex_lock(&channel->lock);
397
398 if (channel->pending_notifications.count) {
399 *_notification_pending = true;
400 goto end_unlock;
401 }
402
403 if (channel->socket < 0) {
404 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
405 goto end_unlock;
406 }
407
408 /*
409 * Check, without blocking, if data is available on the channel's
410 * socket. If there is data available, it is safe to read (blocking)
411 * on the socket for a message from the session daemon.
412 *
413 * Since all commands wait for the session daemon's reply before
414 * releasing the channel's lock, the protocol only allows for
415 * notifications and "notification dropped" messages to come
416 * through. If we receive a different message type, it is
417 * considered a protocol error.
418 *
419 * Note that this function is not guaranteed not to block. This
420 * will block until our peer (the session daemon) has sent a complete
421 * message if we see data available on the socket. If the peer does
422 * not respect the protocol, this may block indefinitely.
423 */
424 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
425 if (ret < 0) {
426 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
427 goto end_unlock;
428 }
429 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
430 if (ret < 0) {
431 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
432 goto end_clean_poll;
433 }
434 /* timeout = 0: return immediately. */
435 ret = lttng_poll_wait_interruptible(&events, 0);
436 if (ret == 0) {
437 /* No data available. */
438 *_notification_pending = false;
439 goto end_clean_poll;
440 } else if (ret < 0) {
441 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
442 goto end_clean_poll;
443 }
444
445 /* Data available on socket. */
446 ret = receive_message(channel);
447 if (ret) {
448 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
449 goto end_clean_poll;
450 }
451
452 switch (get_current_message_type(channel)) {
453 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
454 ret = enqueue_notification_from_current_message(channel);
455 if (ret) {
456 goto end_clean_poll;
457 }
458 *_notification_pending = true;
459 break;
460 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
461 ret = enqueue_dropped_notification(channel);
462 if (ret) {
463 goto end_clean_poll;
464 }
465 *_notification_pending = true;
466 break;
467 default:
468 /* Protocol error. */
469 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
470 goto end_clean_poll;
471 }
472
473 end_clean_poll:
474 lttng_poll_clean(&events);
475 end_unlock:
476 pthread_mutex_unlock(&channel->lock);
477 end:
478 return status;
479 }
480
481 static
482 int receive_command_reply(struct lttng_notification_channel *channel,
483 enum lttng_notification_channel_status *status)
484 {
485 int ret;
486 struct lttng_notification_channel_command_reply *reply;
487
488 while (true) {
489 enum lttng_notification_channel_message_type msg_type;
490
491 ret = receive_message(channel);
492 if (ret) {
493 goto end;
494 }
495
496 msg_type = get_current_message_type(channel);
497 switch (msg_type) {
498 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
499 goto exit_loop;
500 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
501 ret = enqueue_notification_from_current_message(
502 channel);
503 if (ret) {
504 goto end;
505 }
506 break;
507 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
508 ret = enqueue_dropped_notification(channel);
509 if (ret) {
510 goto end;
511 }
512 break;
513 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
514 {
515 struct lttng_notification_channel_command_handshake *handshake;
516
517 handshake = (struct lttng_notification_channel_command_handshake *)
518 (channel->reception_buffer.data +
519 sizeof(struct lttng_notification_channel_message));
520 channel->version.major = handshake->major;
521 channel->version.minor = handshake->minor;
522 channel->version.set = true;
523 break;
524 }
525 default:
526 ret = -1;
527 goto end;
528 }
529 }
530
531 exit_loop:
532 if (channel->reception_buffer.size <
533 (sizeof(struct lttng_notification_channel_message) +
534 sizeof(*reply))) {
535 /* Invalid message received. */
536 ret = -1;
537 goto end;
538 }
539
540 reply = (struct lttng_notification_channel_command_reply *)
541 (channel->reception_buffer.data +
542 sizeof(struct lttng_notification_channel_message));
543 *status = (enum lttng_notification_channel_status) reply->status;
544 end:
545 return ret;
546 }
547
548 static
549 int handshake(struct lttng_notification_channel *channel)
550 {
551 ssize_t ret;
552 enum lttng_notification_channel_status status =
553 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
554 struct lttng_notification_channel_command_handshake handshake = {
555 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
556 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
557 };
558 struct lttng_notification_channel_message msg_header = {
559 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
560 .size = sizeof(handshake),
561 };
562 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
563
564 memcpy(send_buffer, &msg_header, sizeof(msg_header));
565 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
566
567 pthread_mutex_lock(&channel->lock);
568
569 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
570 sizeof(send_buffer));
571 if (ret < 0) {
572 goto end_unlock;
573 }
574
575 /* Receive handshake info from the sessiond. */
576 ret = receive_command_reply(channel, &status);
577 if (ret < 0) {
578 goto end_unlock;
579 }
580
581 if (!channel->version.set) {
582 ret = -1;
583 goto end_unlock;
584 }
585
586 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
587 ret = -1;
588 goto end_unlock;
589 }
590
591 end_unlock:
592 pthread_mutex_unlock(&channel->lock);
593 return ret;
594 }
595
596 static
597 enum lttng_notification_channel_status send_condition_command(
598 struct lttng_notification_channel *channel,
599 enum lttng_notification_channel_message_type type,
600 const struct lttng_condition *condition)
601 {
602 int socket;
603 ssize_t ret;
604 enum lttng_notification_channel_status status =
605 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
606 struct lttng_dynamic_buffer buffer;
607 struct lttng_notification_channel_message cmd_header = {
608 .type = (int8_t) type,
609 };
610
611 lttng_dynamic_buffer_init(&buffer);
612
613 if (!channel) {
614 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
615 goto end;
616 }
617
618 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
619 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
620
621 pthread_mutex_lock(&channel->lock);
622 socket = channel->socket;
623 if (!lttng_condition_validate(condition)) {
624 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
625 goto end_unlock;
626 }
627
628 ret = lttng_dynamic_buffer_append(&buffer, &cmd_header,
629 sizeof(cmd_header));
630 if (ret) {
631 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
632 goto end_unlock;
633 }
634
635 ret = lttng_condition_serialize(condition, &buffer);
636 if (ret) {
637 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
638 goto end_unlock;
639 }
640
641 /* Update payload length. */
642 ((struct lttng_notification_channel_message *) buffer.data)->size =
643 (uint32_t) (buffer.size - sizeof(cmd_header));
644
645 ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size);
646 if (ret < 0) {
647 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
648 goto end_unlock;
649 }
650
651 ret = receive_command_reply(channel, &status);
652 if (ret < 0) {
653 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
654 goto end_unlock;
655 }
656 end_unlock:
657 pthread_mutex_unlock(&channel->lock);
658 end:
659 lttng_dynamic_buffer_reset(&buffer);
660 return status;
661 }
662
663 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
664 struct lttng_notification_channel *channel,
665 const struct lttng_condition *condition)
666 {
667 return send_condition_command(channel,
668 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
669 condition);
670 }
671
672 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
673 struct lttng_notification_channel *channel,
674 const struct lttng_condition *condition)
675 {
676 return send_condition_command(channel,
677 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
678 condition);
679 }
680
681 void lttng_notification_channel_destroy(
682 struct lttng_notification_channel *channel)
683 {
684 if (!channel) {
685 return;
686 }
687
688 if (channel->socket >= 0) {
689 (void) lttcomm_close_unix_sock(channel->socket);
690 }
691 pthread_mutex_destroy(&channel->lock);
692 lttng_dynamic_buffer_reset(&channel->reception_buffer);
693 free(channel);
694 }
This page took 0.043783 seconds and 5 git commands to generate.