5487d051b5013fd60b080491ce865966bc6ffeab
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
7 *
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
11 * for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
27 #include <assert.h>
28 #include "lttng-ctl-helper.h"
29 #include <sys/select.h>
30 #include <sys/time.h>
31
32 static
33 int handshake(struct lttng_notification_channel *channel);
34
35 /*
36 * Populates the reception buffer with the next complete message.
37 * The caller must acquire the channel's lock.
38 */
39 static
40 int receive_message(struct lttng_notification_channel *channel)
41 {
42 ssize_t ret;
43 struct lttng_notification_channel_message msg;
44
45 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
46 ret = -1;
47 goto end;
48 }
49
50 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
51 if (ret <= 0) {
52 ret = -1;
53 goto error;
54 }
55
56 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
57 ret = -1;
58 goto error;
59 }
60
61 /* Add message header at buffer's start. */
62 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
63 sizeof(msg));
64 if (ret) {
65 goto error;
66 }
67
68 /* Reserve space for the payload. */
69 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
70 channel->reception_buffer.size + msg.size);
71 if (ret) {
72 goto error;
73 }
74
75 /* Receive message payload. */
76 ret = lttcomm_recv_unix_sock(channel->socket,
77 channel->reception_buffer.data + sizeof(msg), msg.size);
78 if (ret < (ssize_t) msg.size) {
79 ret = -1;
80 goto error;
81 }
82 ret = 0;
83 end:
84 return ret;
85 error:
86 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
87 ret = -1;
88 }
89 goto end;
90 }
91
92 static
93 enum lttng_notification_channel_message_type get_current_message_type(
94 struct lttng_notification_channel *channel)
95 {
96 struct lttng_notification_channel_message *msg;
97
98 assert(channel->reception_buffer.size >= sizeof(*msg));
99
100 msg = (struct lttng_notification_channel_message *)
101 channel->reception_buffer.data;
102 return (enum lttng_notification_channel_message_type) msg->type;
103 }
104
105 static
106 struct lttng_notification *create_notification_from_current_message(
107 struct lttng_notification_channel *channel)
108 {
109 ssize_t ret;
110 struct lttng_notification *notification = NULL;
111 struct lttng_buffer_view view;
112
113 if (channel->reception_buffer.size <=
114 sizeof(struct lttng_notification_channel_message)) {
115 goto end;
116 }
117
118 view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
119 sizeof(struct lttng_notification_channel_message), -1);
120
121 ret = lttng_notification_create_from_buffer(&view, &notification);
122 if (ret != channel->reception_buffer.size -
123 sizeof(struct lttng_notification_channel_message)) {
124 lttng_notification_destroy(notification);
125 notification = NULL;
126 goto end;
127 }
128 end:
129 return notification;
130 }
131
132 struct lttng_notification_channel *lttng_notification_channel_create(
133 struct lttng_endpoint *endpoint)
134 {
135 int fd, ret;
136 bool is_in_tracing_group = false, is_root = false;
137 char *sock_path = NULL;
138 struct lttng_notification_channel *channel = NULL;
139
140 if (!endpoint ||
141 endpoint != lttng_session_daemon_notification_endpoint) {
142 goto end;
143 }
144
145 sock_path = zmalloc(LTTNG_PATH_MAX);
146 if (!sock_path) {
147 goto end;
148 }
149
150 channel = zmalloc(sizeof(struct lttng_notification_channel));
151 if (!channel) {
152 goto end;
153 }
154 channel->socket = -1;
155 pthread_mutex_init(&channel->lock, NULL);
156 lttng_dynamic_buffer_init(&channel->reception_buffer);
157 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
158
159 is_root = (getuid() == 0);
160 if (!is_root) {
161 is_in_tracing_group = lttng_check_tracing_group();
162 }
163
164 if (is_root || is_in_tracing_group) {
165 lttng_ctl_copy_string(sock_path,
166 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
167 LTTNG_PATH_MAX);
168 ret = lttcomm_connect_unix_sock(sock_path);
169 if (ret >= 0) {
170 fd = ret;
171 goto set_fd;
172 }
173 }
174
175 /* Fallback to local session daemon. */
176 ret = snprintf(sock_path, LTTNG_PATH_MAX,
177 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
178 utils_get_home_dir());
179 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
180 goto error;
181 }
182
183 ret = lttcomm_connect_unix_sock(sock_path);
184 if (ret < 0) {
185 goto error;
186 }
187 fd = ret;
188
189 set_fd:
190 channel->socket = fd;
191
192 ret = handshake(channel);
193 if (ret) {
194 goto error;
195 }
196 end:
197 free(sock_path);
198 return channel;
199 error:
200 lttng_notification_channel_destroy(channel);
201 channel = NULL;
202 goto end;
203 }
204
205 enum lttng_notification_channel_status
206 lttng_notification_channel_get_next_notification(
207 struct lttng_notification_channel *channel,
208 struct lttng_notification **_notification)
209 {
210 int ret;
211 struct lttng_notification *notification = NULL;
212 enum lttng_notification_channel_status status =
213 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
214 fd_set read_fds;
215
216 if (!channel || !_notification) {
217 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
218 goto end;
219 }
220
221 pthread_mutex_lock(&channel->lock);
222
223 if (channel->pending_notifications.count) {
224 struct pending_notification *pending_notification;
225
226 assert(!cds_list_empty(&channel->pending_notifications.list));
227
228 /* Deliver one of the pending notifications. */
229 pending_notification = cds_list_first_entry(
230 &channel->pending_notifications.list,
231 struct pending_notification,
232 node);
233 notification = pending_notification->notification;
234 if (!notification) {
235 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
236 }
237 cds_list_del(&pending_notification->node);
238 channel->pending_notifications.count--;
239 free(pending_notification);
240 goto end_unlock;
241 }
242
243 /*
244 * Block on select() instead of the message reception itself as the
245 * recvmsg() wrappers always restard on EINTR. We choose to wait
246 * using select() in order to:
247 * 1) Return if a signal occurs,
248 * 2) Not deal with partially received messages.
249 *
250 * The drawback to this approach is that we assume that messages
251 * are complete/well formed. If a message is shorter than its
252 * announced length, receive_message() will block on recvmsg()
253 * and never return (even if a signal is received).
254 */
255 FD_ZERO(&read_fds);
256 FD_SET(channel->socket, &read_fds);
257 ret = select(channel->socket + 1, &read_fds, NULL, NULL, NULL);
258 if (ret == -1) {
259 status = errno == EINTR ?
260 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
261 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
262 goto end_unlock;
263 }
264
265 ret = receive_message(channel);
266 if (ret) {
267 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
268 goto end_unlock;
269 }
270
271 switch (get_current_message_type(channel)) {
272 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
273 notification = create_notification_from_current_message(
274 channel);
275 if (!notification) {
276 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
277 goto end_unlock;
278 }
279 break;
280 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
281 /* No payload to consume. */
282 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
283 break;
284 default:
285 /* Protocol error. */
286 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
287 goto end_unlock;
288 }
289
290 end_unlock:
291 pthread_mutex_unlock(&channel->lock);
292 end:
293 if (_notification) {
294 *_notification = notification;
295 }
296 return status;
297 }
298
299 static
300 int enqueue_dropped_notification(
301 struct lttng_notification_channel *channel)
302 {
303 int ret = 0;
304 struct pending_notification *pending_notification;
305 struct cds_list_head *last_element =
306 channel->pending_notifications.list.prev;
307
308 pending_notification = caa_container_of(last_element,
309 struct pending_notification, node);
310 if (!pending_notification->notification) {
311 /*
312 * The last enqueued notification indicates dropped
313 * notifications; there is nothing to do as we group
314 * dropped notifications together.
315 */
316 goto end;
317 }
318
319 if (channel->pending_notifications.count >=
320 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
321 pending_notification->notification) {
322 /*
323 * Discard the last enqueued notification to indicate
324 * that notifications were dropped at this point.
325 */
326 lttng_notification_destroy(
327 pending_notification->notification);
328 pending_notification->notification = NULL;
329 goto end;
330 }
331
332 pending_notification = zmalloc(sizeof(*pending_notification));
333 if (!pending_notification) {
334 ret = -1;
335 goto end;
336 }
337 CDS_INIT_LIST_HEAD(&pending_notification->node);
338 cds_list_add(&pending_notification->node,
339 &channel->pending_notifications.list);
340 channel->pending_notifications.count++;
341 end:
342 return ret;
343 }
344
345 static
346 int enqueue_notification_from_current_message(
347 struct lttng_notification_channel *channel)
348 {
349 int ret = 0;
350 struct lttng_notification *notification;
351 struct pending_notification *pending_notification;
352
353 if (channel->pending_notifications.count >=
354 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
355 /* Drop the notification. */
356 ret = enqueue_dropped_notification(channel);
357 goto end;
358 }
359
360 pending_notification = zmalloc(sizeof(*pending_notification));
361 if (!pending_notification) {
362 ret = -1;
363 goto error;
364 }
365 CDS_INIT_LIST_HEAD(&pending_notification->node);
366
367 notification = create_notification_from_current_message(channel);
368 if (!notification) {
369 ret = -1;
370 goto error;
371 }
372
373 pending_notification->notification = notification;
374 cds_list_add(&pending_notification->node,
375 &channel->pending_notifications.list);
376 channel->pending_notifications.count++;
377 end:
378 return ret;
379 error:
380 free(pending_notification);
381 goto end;
382 }
383
384 enum lttng_notification_channel_status
385 lttng_notification_channel_has_pending_notification(
386 struct lttng_notification_channel *channel,
387 bool *_notification_pending)
388 {
389 int ret;
390 enum lttng_notification_channel_status status =
391 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
392 fd_set read_fds;
393 struct timeval timeout;
394
395 FD_ZERO(&read_fds);
396 memset(&timeout, 0, sizeof(timeout));
397
398 if (!channel || !_notification_pending) {
399 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
400 goto end;
401 }
402
403 pthread_mutex_lock(&channel->lock);
404
405 if (channel->pending_notifications.count) {
406 *_notification_pending = true;
407 goto end_unlock;
408 }
409
410 if (channel->socket < 0) {
411 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
412 goto end_unlock;
413 }
414
415 /*
416 * Check, without blocking, if data is available on the channel's
417 * socket. If there is data available, it is safe to read (blocking)
418 * on the socket for a message from the session daemon.
419 *
420 * Since all commands wait for the session daemon's reply before
421 * releasing the channel's lock, the protocol only allows for
422 * notifications and "notification dropped" messages to come
423 * through. If we receive a different message type, it is
424 * considered a protocol error.
425 *
426 * Note that this function is not guaranteed not to block. This
427 * will block until our peer (the session daemon) has sent a complete
428 * message if we see data available on the socket. If the peer does
429 * not respect the protocol, this may block indefinitely.
430 */
431 FD_SET(channel->socket, &read_fds);
432 do {
433 ret = select(channel->socket + 1, &read_fds, NULL, NULL, &timeout);
434 } while (ret < 0 && errno == EINTR);
435
436 if (ret == 0) {
437 /* No data available. */
438 *_notification_pending = false;
439 goto end_unlock;
440 } else if (ret < 0) {
441 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
442 goto end_unlock;
443 }
444
445 /* Data available on socket. */
446 ret = receive_message(channel);
447 if (ret) {
448 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
449 goto end_unlock;
450 }
451
452 switch (get_current_message_type(channel)) {
453 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
454 ret = enqueue_notification_from_current_message(channel);
455 if (ret) {
456 goto end_unlock;
457 }
458 *_notification_pending = true;
459 break;
460 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
461 ret = enqueue_dropped_notification(channel);
462 if (ret) {
463 goto end_unlock;
464 }
465 *_notification_pending = true;
466 break;
467 default:
468 /* Protocol error. */
469 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
470 goto end_unlock;
471 }
472
473 end_unlock:
474 pthread_mutex_unlock(&channel->lock);
475 end:
476 return status;
477 }
478
479 static
480 int receive_command_reply(struct lttng_notification_channel *channel,
481 enum lttng_notification_channel_status *status)
482 {
483 int ret;
484 struct lttng_notification_channel_command_reply *reply;
485
486 while (true) {
487 enum lttng_notification_channel_message_type msg_type;
488
489 ret = receive_message(channel);
490 if (ret) {
491 goto end;
492 }
493
494 msg_type = get_current_message_type(channel);
495 switch (msg_type) {
496 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
497 goto exit_loop;
498 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
499 ret = enqueue_notification_from_current_message(
500 channel);
501 if (ret) {
502 goto end;
503 }
504 break;
505 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
506 ret = enqueue_dropped_notification(channel);
507 if (ret) {
508 goto end;
509 }
510 break;
511 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
512 {
513 struct lttng_notification_channel_command_handshake *handshake;
514
515 handshake = (struct lttng_notification_channel_command_handshake *)
516 (channel->reception_buffer.data +
517 sizeof(struct lttng_notification_channel_message));
518 channel->version.major = handshake->major;
519 channel->version.minor = handshake->minor;
520 channel->version.set = true;
521 break;
522 }
523 default:
524 ret = -1;
525 goto end;
526 }
527 }
528
529 exit_loop:
530 if (channel->reception_buffer.size <
531 (sizeof(struct lttng_notification_channel_message) +
532 sizeof(*reply))) {
533 /* Invalid message received. */
534 ret = -1;
535 goto end;
536 }
537
538 reply = (struct lttng_notification_channel_command_reply *)
539 (channel->reception_buffer.data +
540 sizeof(struct lttng_notification_channel_message));
541 *status = (enum lttng_notification_channel_status) reply->status;
542 end:
543 return ret;
544 }
545
546 static
547 int handshake(struct lttng_notification_channel *channel)
548 {
549 ssize_t ret;
550 enum lttng_notification_channel_status status =
551 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
552 struct lttng_notification_channel_command_handshake handshake = {
553 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
554 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
555 };
556 struct lttng_notification_channel_message msg_header = {
557 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
558 .size = sizeof(handshake),
559 };
560 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
561
562 memcpy(send_buffer, &msg_header, sizeof(msg_header));
563 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
564
565 pthread_mutex_lock(&channel->lock);
566
567 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
568 sizeof(send_buffer));
569 if (ret < 0) {
570 goto end_unlock;
571 }
572
573 /* Receive handshake info from the sessiond. */
574 ret = receive_command_reply(channel, &status);
575 if (ret < 0) {
576 goto end_unlock;
577 }
578
579 if (!channel->version.set) {
580 ret = -1;
581 goto end_unlock;
582 }
583
584 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
585 ret = -1;
586 goto end_unlock;
587 }
588
589 end_unlock:
590 pthread_mutex_unlock(&channel->lock);
591 return ret;
592 }
593
594 static
595 enum lttng_notification_channel_status send_condition_command(
596 struct lttng_notification_channel *channel,
597 enum lttng_notification_channel_message_type type,
598 const struct lttng_condition *condition)
599 {
600 int socket;
601 ssize_t ret;
602 enum lttng_notification_channel_status status =
603 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
604 struct lttng_dynamic_buffer buffer;
605 struct lttng_notification_channel_message cmd_header = {
606 .type = (int8_t) type,
607 };
608
609 lttng_dynamic_buffer_init(&buffer);
610
611 if (!channel) {
612 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
613 goto end;
614 }
615
616 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
617 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
618
619 pthread_mutex_lock(&channel->lock);
620 socket = channel->socket;
621 if (!lttng_condition_validate(condition)) {
622 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
623 goto end_unlock;
624 }
625
626 ret = lttng_dynamic_buffer_append(&buffer, &cmd_header,
627 sizeof(cmd_header));
628 if (ret) {
629 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
630 goto end_unlock;
631 }
632
633 ret = lttng_condition_serialize(condition, &buffer);
634 if (ret) {
635 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
636 goto end_unlock;
637 }
638
639 /* Update payload length. */
640 ((struct lttng_notification_channel_message *) buffer.data)->size =
641 (uint32_t) (buffer.size - sizeof(cmd_header));
642
643 ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size);
644 if (ret < 0) {
645 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
646 goto end_unlock;
647 }
648
649 ret = receive_command_reply(channel, &status);
650 if (ret < 0) {
651 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
652 goto end_unlock;
653 }
654 end_unlock:
655 pthread_mutex_unlock(&channel->lock);
656 end:
657 lttng_dynamic_buffer_reset(&buffer);
658 return status;
659 }
660
661 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
662 struct lttng_notification_channel *channel,
663 const struct lttng_condition *condition)
664 {
665 return send_condition_command(channel,
666 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
667 condition);
668 }
669
670 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
671 struct lttng_notification_channel *channel,
672 const struct lttng_condition *condition)
673 {
674 return send_condition_command(channel,
675 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
676 condition);
677 }
678
679 void lttng_notification_channel_destroy(
680 struct lttng_notification_channel *channel)
681 {
682 if (!channel) {
683 return;
684 }
685
686 if (channel->socket >= 0) {
687 (void) lttcomm_close_unix_sock(channel->socket);
688 }
689 pthread_mutex_destroy(&channel->lock);
690 lttng_dynamic_buffer_reset(&channel->reception_buffer);
691 free(channel);
692 }
This page took 0.043345 seconds and 4 git commands to generate.