2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
28 #include "lttng-ctl-helper.h"
29 #include <sys/select.h>
33 int handshake(struct lttng_notification_channel
*channel
);
36 * Populates the reception buffer with the next complete message.
37 * The caller must acquire the channel's lock.
40 int receive_message(struct lttng_notification_channel
*channel
)
43 struct lttng_notification_channel_message msg
;
45 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
50 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
56 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
61 /* Add message header at buffer's start. */
62 ret
= lttng_dynamic_buffer_append(&channel
->reception_buffer
, &msg
,
68 /* Reserve space for the payload. */
69 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_buffer
,
70 channel
->reception_buffer
.size
+ msg
.size
);
75 /* Receive message payload. */
76 ret
= lttcomm_recv_unix_sock(channel
->socket
,
77 channel
->reception_buffer
.data
+ sizeof(msg
), msg
.size
);
78 if (ret
< (ssize_t
) msg
.size
) {
86 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
93 enum lttng_notification_channel_message_type
get_current_message_type(
94 struct lttng_notification_channel
*channel
)
96 struct lttng_notification_channel_message
*msg
;
98 assert(channel
->reception_buffer
.size
>= sizeof(*msg
));
100 msg
= (struct lttng_notification_channel_message
*)
101 channel
->reception_buffer
.data
;
102 return (enum lttng_notification_channel_message_type
) msg
->type
;
106 struct lttng_notification
*create_notification_from_current_message(
107 struct lttng_notification_channel
*channel
)
110 struct lttng_notification
*notification
= NULL
;
111 struct lttng_buffer_view view
;
113 if (channel
->reception_buffer
.size
<=
114 sizeof(struct lttng_notification_channel_message
)) {
118 view
= lttng_buffer_view_from_dynamic_buffer(&channel
->reception_buffer
,
119 sizeof(struct lttng_notification_channel_message
), -1);
121 ret
= lttng_notification_create_from_buffer(&view
, ¬ification
);
122 if (ret
!= channel
->reception_buffer
.size
-
123 sizeof(struct lttng_notification_channel_message
)) {
124 lttng_notification_destroy(notification
);
132 struct lttng_notification_channel
*lttng_notification_channel_create(
133 struct lttng_endpoint
*endpoint
)
136 bool is_in_tracing_group
= false, is_root
= false;
137 char *sock_path
= NULL
;
138 struct lttng_notification_channel
*channel
= NULL
;
141 endpoint
!= lttng_session_daemon_notification_endpoint
) {
145 sock_path
= zmalloc(LTTNG_PATH_MAX
);
150 channel
= zmalloc(sizeof(struct lttng_notification_channel
));
154 channel
->socket
= -1;
155 pthread_mutex_init(&channel
->lock
, NULL
);
156 lttng_dynamic_buffer_init(&channel
->reception_buffer
);
157 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
159 is_root
= (getuid() == 0);
161 is_in_tracing_group
= lttng_check_tracing_group();
164 if (is_root
|| is_in_tracing_group
) {
165 lttng_ctl_copy_string(sock_path
,
166 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
168 ret
= lttcomm_connect_unix_sock(sock_path
);
175 /* Fallback to local session daemon. */
176 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
177 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
178 utils_get_home_dir());
179 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
183 ret
= lttcomm_connect_unix_sock(sock_path
);
190 channel
->socket
= fd
;
192 ret
= handshake(channel
);
200 lttng_notification_channel_destroy(channel
);
205 enum lttng_notification_channel_status
206 lttng_notification_channel_get_next_notification(
207 struct lttng_notification_channel
*channel
,
208 struct lttng_notification
**_notification
)
211 struct lttng_notification
*notification
= NULL
;
212 enum lttng_notification_channel_status status
=
213 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
216 if (!channel
|| !_notification
) {
217 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
221 pthread_mutex_lock(&channel
->lock
);
223 if (channel
->pending_notifications
.count
) {
224 struct pending_notification
*pending_notification
;
226 assert(!cds_list_empty(&channel
->pending_notifications
.list
));
228 /* Deliver one of the pending notifications. */
229 pending_notification
= cds_list_first_entry(
230 &channel
->pending_notifications
.list
,
231 struct pending_notification
,
233 notification
= pending_notification
->notification
;
235 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
237 cds_list_del(&pending_notification
->node
);
238 channel
->pending_notifications
.count
--;
239 free(pending_notification
);
244 * Block on select() instead of the message reception itself as the
245 * recvmsg() wrappers always restard on EINTR. We choose to wait
246 * using select() in order to:
247 * 1) Return if a signal occurs,
248 * 2) Not deal with partially received messages.
250 * The drawback to this approach is that we assume that messages
251 * are complete/well formed. If a message is shorter than its
252 * announced length, receive_message() will block on recvmsg()
253 * and never return (even if a signal is received).
256 FD_SET(channel
->socket
, &read_fds
);
257 ret
= select(channel
->socket
+ 1, &read_fds
, NULL
, NULL
, NULL
);
259 status
= errno
== EINTR
?
260 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
261 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
265 ret
= receive_message(channel
);
267 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
271 switch (get_current_message_type(channel
)) {
272 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
273 notification
= create_notification_from_current_message(
276 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
280 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
281 /* No payload to consume. */
282 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
285 /* Protocol error. */
286 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
291 pthread_mutex_unlock(&channel
->lock
);
292 *_notification
= notification
;
298 int enqueue_dropped_notification(
299 struct lttng_notification_channel
*channel
)
302 struct pending_notification
*pending_notification
;
303 struct cds_list_head
*last_element
=
304 channel
->pending_notifications
.list
.prev
;
306 pending_notification
= caa_container_of(last_element
,
307 struct pending_notification
, node
);
308 if (!pending_notification
->notification
) {
310 * The last enqueued notification indicates dropped
311 * notifications; there is nothing to do as we group
312 * dropped notifications together.
317 if (channel
->pending_notifications
.count
>=
318 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
319 pending_notification
->notification
) {
321 * Discard the last enqueued notification to indicate
322 * that notifications were dropped at this point.
324 lttng_notification_destroy(
325 pending_notification
->notification
);
326 pending_notification
->notification
= NULL
;
330 pending_notification
= zmalloc(sizeof(*pending_notification
));
331 if (!pending_notification
) {
335 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
336 cds_list_add(&pending_notification
->node
,
337 &channel
->pending_notifications
.list
);
338 channel
->pending_notifications
.count
++;
344 int enqueue_notification_from_current_message(
345 struct lttng_notification_channel
*channel
)
348 struct lttng_notification
*notification
;
349 struct pending_notification
*pending_notification
;
351 if (channel
->pending_notifications
.count
>=
352 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
353 /* Drop the notification. */
354 ret
= enqueue_dropped_notification(channel
);
358 pending_notification
= zmalloc(sizeof(*pending_notification
));
359 if (!pending_notification
) {
363 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
365 notification
= create_notification_from_current_message(channel
);
371 pending_notification
->notification
= notification
;
372 cds_list_add(&pending_notification
->node
,
373 &channel
->pending_notifications
.list
);
374 channel
->pending_notifications
.count
++;
378 free(pending_notification
);
382 enum lttng_notification_channel_status
383 lttng_notification_channel_has_pending_notification(
384 struct lttng_notification_channel
*channel
,
385 bool *_notification_pending
)
388 enum lttng_notification_channel_status status
=
389 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
391 struct timeval timeout
;
394 memset(&timeout
, 0, sizeof(timeout
));
396 if (!channel
|| !_notification_pending
) {
397 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
401 pthread_mutex_lock(&channel
->lock
);
403 if (channel
->pending_notifications
.count
) {
404 *_notification_pending
= true;
408 if (channel
->socket
< 0) {
409 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
414 * Check, without blocking, if data is available on the channel's
415 * socket. If there is data available, it is safe to read (blocking)
416 * on the socket for a message from the session daemon.
418 * Since all commands wait for the session daemon's reply before
419 * releasing the channel's lock, the protocol only allows for
420 * notifications and "notification dropped" messages to come
421 * through. If we receive a different message type, it is
422 * considered a protocol error.
424 * Note that this function is not guaranteed not to block. This
425 * will block until our peer (the session daemon) has sent a complete
426 * message if we see data available on the socket. If the peer does
427 * not respect the protocol, this may block indefinitely.
429 FD_SET(channel
->socket
, &read_fds
);
431 ret
= select(channel
->socket
+ 1, &read_fds
, NULL
, NULL
, &timeout
);
432 } while (ret
< 0 && errno
== EINTR
);
435 /* No data available. */
436 *_notification_pending
= false;
438 } else if (ret
< 0) {
439 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
443 /* Data available on socket. */
444 ret
= receive_message(channel
);
446 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
450 switch (get_current_message_type(channel
)) {
451 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
452 ret
= enqueue_notification_from_current_message(channel
);
456 *_notification_pending
= true;
458 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
459 ret
= enqueue_dropped_notification(channel
);
463 *_notification_pending
= true;
466 /* Protocol error. */
467 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
472 pthread_mutex_unlock(&channel
->lock
);
478 int receive_command_reply(struct lttng_notification_channel
*channel
,
479 enum lttng_notification_channel_status
*status
)
482 struct lttng_notification_channel_command_reply
*reply
;
485 enum lttng_notification_channel_message_type msg_type
;
487 ret
= receive_message(channel
);
492 msg_type
= get_current_message_type(channel
);
494 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
496 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
497 ret
= enqueue_notification_from_current_message(
503 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
504 ret
= enqueue_dropped_notification(channel
);
509 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
511 struct lttng_notification_channel_command_handshake
*handshake
;
513 handshake
= (struct lttng_notification_channel_command_handshake
*)
514 (channel
->reception_buffer
.data
+
515 sizeof(struct lttng_notification_channel_message
));
516 channel
->version
.major
= handshake
->major
;
517 channel
->version
.minor
= handshake
->minor
;
518 channel
->version
.set
= true;
528 if (channel
->reception_buffer
.size
<
529 (sizeof(struct lttng_notification_channel_message
) +
531 /* Invalid message received. */
536 reply
= (struct lttng_notification_channel_command_reply
*)
537 (channel
->reception_buffer
.data
+
538 sizeof(struct lttng_notification_channel_message
));
539 *status
= (enum lttng_notification_channel_status
) reply
->status
;
545 int handshake(struct lttng_notification_channel
*channel
)
548 enum lttng_notification_channel_status status
=
549 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
550 struct lttng_notification_channel_command_handshake handshake
= {
551 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
552 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
554 struct lttng_notification_channel_message msg_header
= {
555 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
556 .size
= sizeof(handshake
),
558 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
560 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
561 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
563 pthread_mutex_lock(&channel
->lock
);
565 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
566 sizeof(send_buffer
));
571 /* Receive handshake info from the sessiond. */
572 ret
= receive_command_reply(channel
, &status
);
577 if (!channel
->version
.set
) {
582 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
588 pthread_mutex_unlock(&channel
->lock
);
593 enum lttng_notification_channel_status
send_condition_command(
594 struct lttng_notification_channel
*channel
,
595 enum lttng_notification_channel_message_type type
,
596 const struct lttng_condition
*condition
)
600 enum lttng_notification_channel_status status
=
601 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
602 struct lttng_dynamic_buffer buffer
;
603 struct lttng_notification_channel_message cmd_header
= {
604 .type
= (int8_t) type
,
607 lttng_dynamic_buffer_init(&buffer
);
610 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
614 assert(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
615 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
617 pthread_mutex_lock(&channel
->lock
);
618 socket
= channel
->socket
;
619 if (!lttng_condition_validate(condition
)) {
620 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
624 ret
= lttng_dynamic_buffer_append(&buffer
, &cmd_header
,
627 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
631 ret
= lttng_condition_serialize(condition
, &buffer
);
633 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
637 /* Update payload length. */
638 ((struct lttng_notification_channel_message
*) buffer
.data
)->size
=
639 (uint32_t) (buffer
.size
- sizeof(cmd_header
));
641 ret
= lttcomm_send_unix_sock(socket
, buffer
.data
, buffer
.size
);
643 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
647 ret
= receive_command_reply(channel
, &status
);
649 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
653 pthread_mutex_unlock(&channel
->lock
);
655 lttng_dynamic_buffer_reset(&buffer
);
659 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
660 struct lttng_notification_channel
*channel
,
661 const struct lttng_condition
*condition
)
663 return send_condition_command(channel
,
664 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
668 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
669 struct lttng_notification_channel
*channel
,
670 const struct lttng_condition
*condition
)
672 return send_condition_command(channel
,
673 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
677 void lttng_notification_channel_destroy(
678 struct lttng_notification_channel
*channel
)
684 if (channel
->socket
>= 0) {
685 (void) lttcomm_close_unix_sock(channel
->socket
);
687 pthread_mutex_destroy(&channel
->lock
);
688 lttng_dynamic_buffer_reset(&channel
->reception_buffer
);