Use the dynamic buffer to serialize notification objects
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
7 *
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
11 * for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
27 #include <assert.h>
28 #include "lttng-ctl-helper.h"
29 #include <sys/select.h>
30 #include <sys/time.h>
31
32 static
33 int handshake(struct lttng_notification_channel *channel);
34
35 /*
36 * Populates the reception buffer with the next complete message.
37 * The caller must acquire the channel's lock.
38 */
39 static
40 int receive_message(struct lttng_notification_channel *channel)
41 {
42 ssize_t ret;
43 struct lttng_notification_channel_message msg;
44
45 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
46 ret = -1;
47 goto end;
48 }
49
50 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
51 if (ret <= 0) {
52 ret = -1;
53 goto error;
54 }
55
56 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
57 ret = -1;
58 goto error;
59 }
60
61 /* Add message header at buffer's start. */
62 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
63 sizeof(msg));
64 if (ret) {
65 goto error;
66 }
67
68 /* Reserve space for the payload. */
69 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
70 channel->reception_buffer.size + msg.size);
71 if (ret) {
72 goto error;
73 }
74
75 /* Receive message payload. */
76 ret = lttcomm_recv_unix_sock(channel->socket,
77 channel->reception_buffer.data + sizeof(msg), msg.size);
78 if (ret < (ssize_t) msg.size) {
79 ret = -1;
80 goto error;
81 }
82 ret = 0;
83 end:
84 return ret;
85 error:
86 if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
87 ret = -1;
88 }
89 goto end;
90 }
91
92 static
93 enum lttng_notification_channel_message_type get_current_message_type(
94 struct lttng_notification_channel *channel)
95 {
96 struct lttng_notification_channel_message *msg;
97
98 assert(channel->reception_buffer.size >= sizeof(*msg));
99
100 msg = (struct lttng_notification_channel_message *)
101 channel->reception_buffer.data;
102 return (enum lttng_notification_channel_message_type) msg->type;
103 }
104
105 static
106 struct lttng_notification *create_notification_from_current_message(
107 struct lttng_notification_channel *channel)
108 {
109 ssize_t ret;
110 struct lttng_notification *notification = NULL;
111 struct lttng_buffer_view view;
112
113 if (channel->reception_buffer.size <=
114 sizeof(struct lttng_notification_channel_message)) {
115 goto end;
116 }
117
118 view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
119 sizeof(struct lttng_notification_channel_message), -1);
120
121 ret = lttng_notification_create_from_buffer(&view, &notification);
122 if (ret != channel->reception_buffer.size -
123 sizeof(struct lttng_notification_channel_message)) {
124 lttng_notification_destroy(notification);
125 notification = NULL;
126 goto end;
127 }
128 end:
129 return notification;
130 }
131
132 struct lttng_notification_channel *lttng_notification_channel_create(
133 struct lttng_endpoint *endpoint)
134 {
135 int fd, ret;
136 bool is_in_tracing_group = false, is_root = false;
137 char *sock_path = NULL;
138 struct lttng_notification_channel *channel = NULL;
139
140 if (!endpoint ||
141 endpoint != lttng_session_daemon_notification_endpoint) {
142 goto end;
143 }
144
145 sock_path = zmalloc(LTTNG_PATH_MAX);
146 if (!sock_path) {
147 goto end;
148 }
149
150 channel = zmalloc(sizeof(struct lttng_notification_channel));
151 if (!channel) {
152 goto end;
153 }
154 channel->socket = -1;
155 pthread_mutex_init(&channel->lock, NULL);
156 lttng_dynamic_buffer_init(&channel->reception_buffer);
157 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
158
159 is_root = (getuid() == 0);
160 if (!is_root) {
161 is_in_tracing_group = lttng_check_tracing_group();
162 }
163
164 if (is_root || is_in_tracing_group) {
165 lttng_ctl_copy_string(sock_path,
166 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
167 LTTNG_PATH_MAX);
168 ret = lttcomm_connect_unix_sock(sock_path);
169 if (ret >= 0) {
170 fd = ret;
171 goto set_fd;
172 }
173 }
174
175 /* Fallback to local session daemon. */
176 ret = snprintf(sock_path, LTTNG_PATH_MAX,
177 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
178 utils_get_home_dir());
179 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
180 goto error;
181 }
182
183 ret = lttcomm_connect_unix_sock(sock_path);
184 if (ret < 0) {
185 goto error;
186 }
187 fd = ret;
188
189 set_fd:
190 channel->socket = fd;
191
192 ret = handshake(channel);
193 if (ret) {
194 goto error;
195 }
196 end:
197 free(sock_path);
198 return channel;
199 error:
200 lttng_notification_channel_destroy(channel);
201 channel = NULL;
202 goto end;
203 }
204
205 enum lttng_notification_channel_status
206 lttng_notification_channel_get_next_notification(
207 struct lttng_notification_channel *channel,
208 struct lttng_notification **_notification)
209 {
210 int ret;
211 struct lttng_notification *notification = NULL;
212 enum lttng_notification_channel_status status =
213 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
214
215 if (!channel || !_notification) {
216 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
217 goto end;
218 }
219
220 pthread_mutex_lock(&channel->lock);
221
222 if (channel->pending_notifications.count) {
223 struct pending_notification *pending_notification;
224
225 assert(!cds_list_empty(&channel->pending_notifications.list));
226
227 /* Deliver one of the pending notifications. */
228 pending_notification = cds_list_first_entry(
229 &channel->pending_notifications.list,
230 struct pending_notification,
231 node);
232 notification = pending_notification->notification;
233 if (!notification) {
234 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
235 }
236 cds_list_del(&pending_notification->node);
237 channel->pending_notifications.count--;
238 free(pending_notification);
239 goto end_unlock;
240 }
241
242 ret = receive_message(channel);
243 if (ret) {
244 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
245 goto end_unlock;
246 }
247
248 switch (get_current_message_type(channel)) {
249 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
250 notification = create_notification_from_current_message(
251 channel);
252 if (!notification) {
253 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
254 goto end_unlock;
255 }
256 break;
257 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
258 /* No payload to consume. */
259 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
260 break;
261 default:
262 /* Protocol error. */
263 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
264 goto end_unlock;
265 }
266
267 end_unlock:
268 pthread_mutex_unlock(&channel->lock);
269 end:
270 if (_notification) {
271 *_notification = notification;
272 }
273 return status;
274 }
275
276 static
277 int enqueue_dropped_notification(
278 struct lttng_notification_channel *channel)
279 {
280 int ret = 0;
281 struct pending_notification *pending_notification;
282 struct cds_list_head *last_element =
283 channel->pending_notifications.list.prev;
284
285 pending_notification = caa_container_of(last_element,
286 struct pending_notification, node);
287 if (!pending_notification->notification) {
288 /*
289 * The last enqueued notification indicates dropped
290 * notifications; there is nothing to do as we group
291 * dropped notifications together.
292 */
293 goto end;
294 }
295
296 if (channel->pending_notifications.count >=
297 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
298 pending_notification->notification) {
299 /*
300 * Discard the last enqueued notification to indicate
301 * that notifications were dropped at this point.
302 */
303 lttng_notification_destroy(
304 pending_notification->notification);
305 pending_notification->notification = NULL;
306 goto end;
307 }
308
309 pending_notification = zmalloc(sizeof(*pending_notification));
310 if (!pending_notification) {
311 ret = -1;
312 goto end;
313 }
314 CDS_INIT_LIST_HEAD(&pending_notification->node);
315 cds_list_add(&pending_notification->node,
316 &channel->pending_notifications.list);
317 channel->pending_notifications.count++;
318 end:
319 return ret;
320 }
321
322 static
323 int enqueue_notification_from_current_message(
324 struct lttng_notification_channel *channel)
325 {
326 int ret = 0;
327 struct lttng_notification *notification;
328 struct pending_notification *pending_notification;
329
330 if (channel->pending_notifications.count >=
331 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
332 /* Drop the notification. */
333 ret = enqueue_dropped_notification(channel);
334 goto end;
335 }
336
337 pending_notification = zmalloc(sizeof(*pending_notification));
338 if (!pending_notification) {
339 ret = -1;
340 goto error;
341 }
342 CDS_INIT_LIST_HEAD(&pending_notification->node);
343
344 notification = create_notification_from_current_message(channel);
345 if (!notification) {
346 ret = -1;
347 goto error;
348 }
349
350 pending_notification->notification = notification;
351 cds_list_add(&pending_notification->node,
352 &channel->pending_notifications.list);
353 channel->pending_notifications.count++;
354 end:
355 return ret;
356 error:
357 free(pending_notification);
358 goto end;
359 }
360
361 enum lttng_notification_channel_status
362 lttng_notification_channel_has_pending_notification(
363 struct lttng_notification_channel *channel,
364 bool *_notification_pending)
365 {
366 int ret;
367 enum lttng_notification_channel_status status =
368 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
369 fd_set read_fds;
370 struct timeval timeout;
371
372 FD_ZERO(&read_fds);
373 memset(&timeout, 0, sizeof(timeout));
374
375 if (!channel || !_notification_pending) {
376 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
377 goto end;
378 }
379
380 pthread_mutex_lock(&channel->lock);
381
382 if (channel->pending_notifications.count) {
383 *_notification_pending = true;
384 goto end_unlock;
385 }
386
387 if (channel->socket < 0) {
388 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
389 goto end_unlock;
390 }
391
392 /*
393 * Check, without blocking, if data is available on the channel's
394 * socket. If there is data available, it is safe to read (blocking)
395 * on the socket for a message from the session daemon.
396 *
397 * Since all commands wait for the session daemon's reply before
398 * releasing the channel's lock, the protocol only allows for
399 * notifications and "notification dropped" messages to come
400 * through. If we receive a different message type, it is
401 * considered a protocol error.
402 *
403 * Note that this function is not guaranteed not to block. This
404 * will block until our peer (the session daemon) has sent a complete
405 * message if we see data available on the socket. If the peer does
406 * not respect the protocol, this may block indefinitely.
407 */
408 FD_SET(channel->socket, &read_fds);
409 do {
410 ret = select(channel->socket + 1, &read_fds, NULL, NULL, &timeout);
411 } while (ret < 0 && errno == EINTR);
412
413 if (ret == 0) {
414 /* No data available. */
415 *_notification_pending = false;
416 goto end_unlock;
417 } else if (ret < 0) {
418 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
419 goto end_unlock;
420 }
421
422 /* Data available on socket. */
423 ret = receive_message(channel);
424 if (ret) {
425 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
426 goto end_unlock;
427 }
428
429 switch (get_current_message_type(channel)) {
430 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
431 ret = enqueue_notification_from_current_message(channel);
432 if (ret) {
433 goto end;
434 }
435 *_notification_pending = true;
436 break;
437 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
438 ret = enqueue_dropped_notification(channel);
439 if (ret) {
440 goto end;
441 }
442 *_notification_pending = true;
443 break;
444 default:
445 /* Protocol error. */
446 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
447 goto end_unlock;
448 }
449
450 end_unlock:
451 pthread_mutex_unlock(&channel->lock);
452 end:
453 return status;
454 }
455
456 static
457 int receive_command_reply(struct lttng_notification_channel *channel,
458 enum lttng_notification_channel_status *status)
459 {
460 int ret;
461 struct lttng_notification_channel_command_reply *reply;
462
463 while (true) {
464 enum lttng_notification_channel_message_type msg_type;
465
466 ret = receive_message(channel);
467 if (ret) {
468 goto end;
469 }
470
471 msg_type = get_current_message_type(channel);
472 switch (msg_type) {
473 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
474 goto exit_loop;
475 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
476 ret = enqueue_notification_from_current_message(
477 channel);
478 if (ret) {
479 goto end;
480 }
481 break;
482 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
483 ret = enqueue_dropped_notification(channel);
484 if (ret) {
485 goto end;
486 }
487 break;
488 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
489 {
490 struct lttng_notification_channel_command_handshake *handshake;
491
492 handshake = (struct lttng_notification_channel_command_handshake *)
493 (channel->reception_buffer.data +
494 sizeof(struct lttng_notification_channel_message));
495 channel->version.major = handshake->major;
496 channel->version.minor = handshake->minor;
497 channel->version.set = true;
498 break;
499 }
500 default:
501 ret = -1;
502 goto end;
503 }
504 }
505
506 exit_loop:
507 if (channel->reception_buffer.size <
508 (sizeof(struct lttng_notification_channel_message) +
509 sizeof(*reply))) {
510 /* Invalid message received. */
511 ret = -1;
512 goto end;
513 }
514
515 reply = (struct lttng_notification_channel_command_reply *)
516 (channel->reception_buffer.data +
517 sizeof(struct lttng_notification_channel_message));
518 *status = (enum lttng_notification_channel_status) reply->status;
519 end:
520 return ret;
521 }
522
523 static
524 int handshake(struct lttng_notification_channel *channel)
525 {
526 ssize_t ret;
527 enum lttng_notification_channel_status status =
528 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
529 struct lttng_notification_channel_command_handshake handshake = {
530 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
531 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
532 };
533 struct lttng_notification_channel_message msg_header = {
534 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
535 .size = sizeof(handshake),
536 };
537 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
538
539 memcpy(send_buffer, &msg_header, sizeof(msg_header));
540 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
541
542 pthread_mutex_lock(&channel->lock);
543
544 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
545 sizeof(send_buffer));
546 if (ret < 0) {
547 goto end_unlock;
548 }
549
550 /* Receive handshake info from the sessiond. */
551 ret = receive_command_reply(channel, &status);
552 if (ret < 0) {
553 goto end_unlock;
554 }
555
556 if (!channel->version.set) {
557 ret = -1;
558 goto end_unlock;
559 }
560
561 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
562 ret = -1;
563 goto end_unlock;
564 }
565
566 end_unlock:
567 pthread_mutex_unlock(&channel->lock);
568 return ret;
569 }
570
571 static
572 enum lttng_notification_channel_status send_condition_command(
573 struct lttng_notification_channel *channel,
574 enum lttng_notification_channel_message_type type,
575 const struct lttng_condition *condition)
576 {
577 int socket;
578 ssize_t ret;
579 enum lttng_notification_channel_status status =
580 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
581 struct lttng_dynamic_buffer buffer;
582 struct lttng_notification_channel_message cmd_header = {
583 .type = (int8_t) type,
584 };
585
586 lttng_dynamic_buffer_init(&buffer);
587
588 if (!channel) {
589 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
590 goto end;
591 }
592
593 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
594 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
595
596 pthread_mutex_lock(&channel->lock);
597 socket = channel->socket;
598 if (!lttng_condition_validate(condition)) {
599 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
600 goto end_unlock;
601 }
602
603 ret = lttng_dynamic_buffer_append(&buffer, &cmd_header,
604 sizeof(cmd_header));
605 if (ret) {
606 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
607 goto end_unlock;
608 }
609
610 ret = lttng_condition_serialize(condition, &buffer);
611 if (ret) {
612 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
613 goto end_unlock;
614 }
615
616 /* Update payload length. */
617 ((struct lttng_notification_channel_message *) buffer.data)->size =
618 (uint32_t) (buffer.size - sizeof(cmd_header));
619
620 ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size);
621 if (ret < 0) {
622 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
623 goto end_unlock;
624 }
625
626 ret = receive_command_reply(channel, &status);
627 if (ret < 0) {
628 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
629 goto end_unlock;
630 }
631 end_unlock:
632 pthread_mutex_unlock(&channel->lock);
633 end:
634 lttng_dynamic_buffer_reset(&buffer);
635 return status;
636 }
637
638 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
639 struct lttng_notification_channel *channel,
640 const struct lttng_condition *condition)
641 {
642 return send_condition_command(channel,
643 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
644 condition);
645 }
646
647 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
648 struct lttng_notification_channel *channel,
649 const struct lttng_condition *condition)
650 {
651 return send_condition_command(channel,
652 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
653 condition);
654 }
655
656 void lttng_notification_channel_destroy(
657 struct lttng_notification_channel *channel)
658 {
659 if (!channel) {
660 return;
661 }
662
663 if (channel->socket >= 0) {
664 (void) lttcomm_close_unix_sock(channel->socket);
665 }
666 pthread_mutex_destroy(&channel->lock);
667 lttng_dynamic_buffer_reset(&channel->reception_buffer);
668 free(channel);
669 }
This page took 0.069828 seconds and 5 git commands to generate.