sessiond: notification: use lttng_payload for communications
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
CommitLineData
a58c490f 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
a58c490f 3 *
ab5be9fa 4 * SPDX-License-Identifier: LGPL-2.1-only
a58c490f 5 *
a58c490f
JG
6 */
7
8#include <lttng/notification/notification-internal.h>
9#include <lttng/notification/channel-internal.h>
10#include <lttng/condition/condition-internal.h>
11#include <lttng/endpoint.h>
12#include <common/defaults.h>
13#include <common/error.h>
14#include <common/dynamic-buffer.h>
15#include <common/utils.h>
16#include <common/defaults.h>
882093ee
JR
17#include <common/payload.h>
18#include <common/payload-view.h>
19#include <common/unix.h>
a58c490f
JG
20#include <assert.h>
21#include "lttng-ctl-helper.h"
d977a743 22#include <common/compat/poll.h>
a58c490f
JG
23
24static
25int handshake(struct lttng_notification_channel *channel);
26
27/*
28 * Populates the reception buffer with the next complete message.
1d757b1c 29 * The caller must acquire the channel's lock.
a58c490f
JG
30 */
31static
32int receive_message(struct lttng_notification_channel *channel)
33{
34 ssize_t ret;
35 struct lttng_notification_channel_message msg;
36
882093ee 37 lttng_payload_clear(&channel->reception_payload);
a58c490f
JG
38
39 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
40 if (ret <= 0) {
41 ret = -1;
42 goto error;
43 }
44
45 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
46 ret = -1;
47 goto error;
48 }
49
50 /* Add message header at buffer's start. */
882093ee 51 ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg,
a58c490f
JG
52 sizeof(msg));
53 if (ret) {
54 goto error;
55 }
56
57 /* Reserve space for the payload. */
882093ee
JR
58 ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer,
59 channel->reception_payload.buffer.size + msg.size);
a58c490f
JG
60 if (ret) {
61 goto error;
62 }
63
64 /* Receive message payload. */
65 ret = lttcomm_recv_unix_sock(channel->socket,
882093ee 66 channel->reception_payload.buffer.data + sizeof(msg), msg.size);
a58c490f
JG
67 if (ret < (ssize_t) msg.size) {
68 ret = -1;
69 goto error;
70 }
882093ee
JR
71
72 /* Receive message fds. */
73 if (msg.fds != 0) {
74 ret = lttcomm_recv_payload_fds_unix_sock(channel->socket,
75 msg.fds, &channel->reception_payload);
76 if (ret < sizeof(int) * msg.fds) {
77 ret = -1;
78 goto error;
79 }
80 }
a58c490f
JG
81 ret = 0;
82end:
83 return ret;
84error:
882093ee 85 lttng_payload_clear(&channel->reception_payload);
a58c490f
JG
86 goto end;
87}
88
89static
90enum lttng_notification_channel_message_type get_current_message_type(
91 struct lttng_notification_channel *channel)
92{
93 struct lttng_notification_channel_message *msg;
94
882093ee 95 assert(channel->reception_payload.buffer.size >= sizeof(*msg));
a58c490f
JG
96
97 msg = (struct lttng_notification_channel_message *)
882093ee 98 channel->reception_payload.buffer.data;
a58c490f
JG
99 return (enum lttng_notification_channel_message_type) msg->type;
100}
101
102static
103struct lttng_notification *create_notification_from_current_message(
104 struct lttng_notification_channel *channel)
105{
106 ssize_t ret;
107 struct lttng_notification *notification = NULL;
a58c490f 108
882093ee 109 if (channel->reception_payload.buffer.size <=
a58c490f
JG
110 sizeof(struct lttng_notification_channel_message)) {
111 goto end;
112 }
113
c0a66c84 114 {
882093ee
JR
115 struct lttng_payload_view view = lttng_payload_view_from_payload(
116 &channel->reception_payload,
c0a66c84
JG
117 sizeof(struct lttng_notification_channel_message),
118 -1);
119
120 ret = lttng_notification_create_from_payload(
121 &view, &notification);
122 }
a58c490f 123
882093ee 124 if (ret != channel->reception_payload.buffer.size -
a58c490f
JG
125 sizeof(struct lttng_notification_channel_message)) {
126 lttng_notification_destroy(notification);
127 notification = NULL;
128 goto end;
129 }
130end:
131 return notification;
132}
133
134struct lttng_notification_channel *lttng_notification_channel_create(
135 struct lttng_endpoint *endpoint)
136{
137 int fd, ret;
138 bool is_in_tracing_group = false, is_root = false;
139 char *sock_path = NULL;
140 struct lttng_notification_channel *channel = NULL;
141
142 if (!endpoint ||
143 endpoint != lttng_session_daemon_notification_endpoint) {
144 goto end;
145 }
146
147 sock_path = zmalloc(LTTNG_PATH_MAX);
148 if (!sock_path) {
149 goto end;
150 }
151
152 channel = zmalloc(sizeof(struct lttng_notification_channel));
153 if (!channel) {
154 goto end;
155 }
156 channel->socket = -1;
157 pthread_mutex_init(&channel->lock, NULL);
882093ee 158 lttng_payload_init(&channel->reception_payload);
a58c490f
JG
159 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
160
161 is_root = (getuid() == 0);
162 if (!is_root) {
163 is_in_tracing_group = lttng_check_tracing_group();
164 }
165
166 if (is_root || is_in_tracing_group) {
167 lttng_ctl_copy_string(sock_path,
168 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
169 LTTNG_PATH_MAX);
170 ret = lttcomm_connect_unix_sock(sock_path);
171 if (ret >= 0) {
172 fd = ret;
173 goto set_fd;
174 }
175 }
176
177 /* Fallback to local session daemon. */
178 ret = snprintf(sock_path, LTTNG_PATH_MAX,
179 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
180 utils_get_home_dir());
181 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
182 goto error;
183 }
184
185 ret = lttcomm_connect_unix_sock(sock_path);
186 if (ret < 0) {
187 goto error;
188 }
189 fd = ret;
190
191set_fd:
192 channel->socket = fd;
193
194 ret = handshake(channel);
195 if (ret) {
196 goto error;
197 }
198end:
199 free(sock_path);
200 return channel;
201error:
202 lttng_notification_channel_destroy(channel);
203 channel = NULL;
204 goto end;
205}
206
207enum lttng_notification_channel_status
208lttng_notification_channel_get_next_notification(
209 struct lttng_notification_channel *channel,
210 struct lttng_notification **_notification)
211{
212 int ret;
213 struct lttng_notification *notification = NULL;
214 enum lttng_notification_channel_status status =
215 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
d977a743 216 struct lttng_poll_event events;
a58c490f
JG
217
218 if (!channel || !_notification) {
219 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
220 goto end;
221 }
222
94a61469
JG
223 pthread_mutex_lock(&channel->lock);
224
a58c490f
JG
225 if (channel->pending_notifications.count) {
226 struct pending_notification *pending_notification;
227
228 assert(!cds_list_empty(&channel->pending_notifications.list));
229
230 /* Deliver one of the pending notifications. */
231 pending_notification = cds_list_first_entry(
232 &channel->pending_notifications.list,
233 struct pending_notification,
234 node);
235 notification = pending_notification->notification;
236 if (!notification) {
237 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
238 }
239 cds_list_del(&pending_notification->node);
240 channel->pending_notifications.count--;
241 free(pending_notification);
94a61469 242 goto end_unlock;
f83bcd90
JG
243 }
244
245 /*
d977a743
MD
246 * Block on interruptible epoll/poll() instead of the message reception
247 * itself as the recvmsg() wrappers always restart on EINTR. We choose
248 * to wait using interruptible epoll/poll() in order to:
f83bcd90
JG
249 * 1) Return if a signal occurs,
250 * 2) Not deal with partially received messages.
251 *
252 * The drawback to this approach is that we assume that messages
253 * are complete/well formed. If a message is shorter than its
254 * announced length, receive_message() will block on recvmsg()
255 * and never return (even if a signal is received).
256 */
d977a743
MD
257 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
258 if (ret < 0) {
259 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
260 goto end_unlock;
261 }
262 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
263 if (ret < 0) {
264 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
265 goto end_clean_poll;
266 }
267 ret = lttng_poll_wait_interruptible(&events, -1);
268 if (ret <= 0) {
269 status = (ret == -1 && errno == EINTR) ?
f83bcd90
JG
270 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
271 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 272 goto end_clean_poll;
a58c490f
JG
273 }
274
a58c490f
JG
275 ret = receive_message(channel);
276 if (ret) {
277 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 278 goto end_clean_poll;
a58c490f
JG
279 }
280
281 switch (get_current_message_type(channel)) {
282 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
283 notification = create_notification_from_current_message(
284 channel);
285 if (!notification) {
286 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 287 goto end_clean_poll;
a58c490f
JG
288 }
289 break;
290 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
291 /* No payload to consume. */
292 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
293 break;
294 default:
295 /* Protocol error. */
296 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 297 goto end_clean_poll;
a58c490f
JG
298 }
299
d977a743
MD
300end_clean_poll:
301 lttng_poll_clean(&events);
a58c490f
JG
302end_unlock:
303 pthread_mutex_unlock(&channel->lock);
a57a7f22 304 *_notification = notification;
a58c490f 305end:
a58c490f
JG
306 return status;
307}
308
309static
310int enqueue_dropped_notification(
311 struct lttng_notification_channel *channel)
312{
313 int ret = 0;
314 struct pending_notification *pending_notification;
315 struct cds_list_head *last_element =
316 channel->pending_notifications.list.prev;
317
318 pending_notification = caa_container_of(last_element,
319 struct pending_notification, node);
320 if (!pending_notification->notification) {
321 /*
322 * The last enqueued notification indicates dropped
323 * notifications; there is nothing to do as we group
324 * dropped notifications together.
325 */
326 goto end;
327 }
328
329 if (channel->pending_notifications.count >=
330 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
331 pending_notification->notification) {
332 /*
333 * Discard the last enqueued notification to indicate
334 * that notifications were dropped at this point.
335 */
336 lttng_notification_destroy(
337 pending_notification->notification);
338 pending_notification->notification = NULL;
339 goto end;
340 }
341
342 pending_notification = zmalloc(sizeof(*pending_notification));
343 if (!pending_notification) {
344 ret = -1;
345 goto end;
346 }
347 CDS_INIT_LIST_HEAD(&pending_notification->node);
348 cds_list_add(&pending_notification->node,
349 &channel->pending_notifications.list);
350 channel->pending_notifications.count++;
351end:
352 return ret;
353}
354
355static
356int enqueue_notification_from_current_message(
357 struct lttng_notification_channel *channel)
358{
359 int ret = 0;
360 struct lttng_notification *notification;
361 struct pending_notification *pending_notification;
362
363 if (channel->pending_notifications.count >=
364 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
365 /* Drop the notification. */
366 ret = enqueue_dropped_notification(channel);
367 goto end;
368 }
369
370 pending_notification = zmalloc(sizeof(*pending_notification));
371 if (!pending_notification) {
372 ret = -1;
373 goto error;
374 }
375 CDS_INIT_LIST_HEAD(&pending_notification->node);
376
377 notification = create_notification_from_current_message(channel);
378 if (!notification) {
379 ret = -1;
380 goto error;
381 }
382
383 pending_notification->notification = notification;
384 cds_list_add(&pending_notification->node,
385 &channel->pending_notifications.list);
386 channel->pending_notifications.count++;
387end:
388 return ret;
389error:
390 free(pending_notification);
391 goto end;
392}
393
1d757b1c
JG
394enum lttng_notification_channel_status
395lttng_notification_channel_has_pending_notification(
396 struct lttng_notification_channel *channel,
397 bool *_notification_pending)
398{
399 int ret;
400 enum lttng_notification_channel_status status =
401 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
d977a743 402 struct lttng_poll_event events;
1d757b1c
JG
403
404 if (!channel || !_notification_pending) {
405 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
406 goto end;
407 }
408
409 pthread_mutex_lock(&channel->lock);
410
411 if (channel->pending_notifications.count) {
412 *_notification_pending = true;
413 goto end_unlock;
414 }
415
416 if (channel->socket < 0) {
417 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
418 goto end_unlock;
419 }
420
421 /*
422 * Check, without blocking, if data is available on the channel's
423 * socket. If there is data available, it is safe to read (blocking)
424 * on the socket for a message from the session daemon.
425 *
426 * Since all commands wait for the session daemon's reply before
427 * releasing the channel's lock, the protocol only allows for
428 * notifications and "notification dropped" messages to come
429 * through. If we receive a different message type, it is
430 * considered a protocol error.
431 *
432 * Note that this function is not guaranteed not to block. This
433 * will block until our peer (the session daemon) has sent a complete
434 * message if we see data available on the socket. If the peer does
435 * not respect the protocol, this may block indefinitely.
436 */
d977a743
MD
437 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
438 if (ret < 0) {
439 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
440 goto end_unlock;
441 }
442 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
443 if (ret < 0) {
444 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
445 goto end_clean_poll;
446 }
447 /* timeout = 0: return immediately. */
448 ret = lttng_poll_wait_interruptible(&events, 0);
1d757b1c
JG
449 if (ret == 0) {
450 /* No data available. */
451 *_notification_pending = false;
d977a743 452 goto end_clean_poll;
1d757b1c
JG
453 } else if (ret < 0) {
454 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 455 goto end_clean_poll;
1d757b1c
JG
456 }
457
458 /* Data available on socket. */
459 ret = receive_message(channel);
460 if (ret) {
461 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 462 goto end_clean_poll;
1d757b1c
JG
463 }
464
465 switch (get_current_message_type(channel)) {
466 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
467 ret = enqueue_notification_from_current_message(channel);
468 if (ret) {
d977a743 469 goto end_clean_poll;
1d757b1c
JG
470 }
471 *_notification_pending = true;
472 break;
473 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
474 ret = enqueue_dropped_notification(channel);
475 if (ret) {
d977a743 476 goto end_clean_poll;
1d757b1c
JG
477 }
478 *_notification_pending = true;
479 break;
480 default:
481 /* Protocol error. */
482 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 483 goto end_clean_poll;
1d757b1c
JG
484 }
485
d977a743
MD
486end_clean_poll:
487 lttng_poll_clean(&events);
1d757b1c
JG
488end_unlock:
489 pthread_mutex_unlock(&channel->lock);
490end:
491 return status;
492}
493
a58c490f
JG
494static
495int receive_command_reply(struct lttng_notification_channel *channel,
496 enum lttng_notification_channel_status *status)
497{
498 int ret;
499 struct lttng_notification_channel_command_reply *reply;
500
501 while (true) {
502 enum lttng_notification_channel_message_type msg_type;
503
504 ret = receive_message(channel);
505 if (ret) {
506 goto end;
507 }
508
509 msg_type = get_current_message_type(channel);
510 switch (msg_type) {
511 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
512 goto exit_loop;
513 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
514 ret = enqueue_notification_from_current_message(
515 channel);
516 if (ret) {
517 goto end;
518 }
519 break;
520 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
521 ret = enqueue_dropped_notification(channel);
522 if (ret) {
523 goto end;
524 }
525 break;
526 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
527 {
528 struct lttng_notification_channel_command_handshake *handshake;
529
530 handshake = (struct lttng_notification_channel_command_handshake *)
882093ee 531 (channel->reception_payload.buffer.data +
a58c490f
JG
532 sizeof(struct lttng_notification_channel_message));
533 channel->version.major = handshake->major;
534 channel->version.minor = handshake->minor;
535 channel->version.set = true;
536 break;
537 }
538 default:
539 ret = -1;
540 goto end;
541 }
542 }
543
544exit_loop:
882093ee 545 if (channel->reception_payload.buffer.size <
a58c490f
JG
546 (sizeof(struct lttng_notification_channel_message) +
547 sizeof(*reply))) {
548 /* Invalid message received. */
549 ret = -1;
550 goto end;
551 }
552
553 reply = (struct lttng_notification_channel_command_reply *)
882093ee 554 (channel->reception_payload.buffer.data +
a58c490f
JG
555 sizeof(struct lttng_notification_channel_message));
556 *status = (enum lttng_notification_channel_status) reply->status;
557end:
558 return ret;
559}
560
561static
562int handshake(struct lttng_notification_channel *channel)
563{
564 ssize_t ret;
565 enum lttng_notification_channel_status status =
566 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
567 struct lttng_notification_channel_command_handshake handshake = {
568 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
569 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
570 };
571 struct lttng_notification_channel_message msg_header = {
572 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
573 .size = sizeof(handshake),
574 };
575 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
576
577 memcpy(send_buffer, &msg_header, sizeof(msg_header));
578 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
579
580 pthread_mutex_lock(&channel->lock);
581
01ea340e 582 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
a58c490f
JG
583 sizeof(send_buffer));
584 if (ret < 0) {
585 goto end_unlock;
586 }
587
588 /* Receive handshake info from the sessiond. */
589 ret = receive_command_reply(channel, &status);
590 if (ret < 0) {
591 goto end_unlock;
592 }
593
594 if (!channel->version.set) {
595 ret = -1;
596 goto end_unlock;
597 }
598
599 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
600 ret = -1;
601 goto end_unlock;
602 }
603
604end_unlock:
605 pthread_mutex_unlock(&channel->lock);
606 return ret;
607}
608
609static
610enum lttng_notification_channel_status send_condition_command(
611 struct lttng_notification_channel *channel,
612 enum lttng_notification_channel_message_type type,
613 const struct lttng_condition *condition)
614{
615 int socket;
3647288f 616 ssize_t ret;
a58c490f
JG
617 enum lttng_notification_channel_status status =
618 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
c0a66c84 619 struct lttng_payload payload;
3647288f
JG
620 struct lttng_notification_channel_message cmd_header = {
621 .type = (int8_t) type,
a58c490f
JG
622 };
623
c0a66c84 624 lttng_payload_init(&payload);
3647288f 625
a58c490f
JG
626 if (!channel) {
627 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
628 goto end;
629 }
630
631 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
632 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
633
634 pthread_mutex_lock(&channel->lock);
635 socket = channel->socket;
882093ee 636
a58c490f
JG
637 if (!lttng_condition_validate(condition)) {
638 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
639 goto end_unlock;
640 }
641
c0a66c84 642 ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header,
3647288f
JG
643 sizeof(cmd_header));
644 if (ret) {
645 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
a58c490f
JG
646 goto end_unlock;
647 }
648
c0a66c84 649 ret = lttng_condition_serialize(condition, &payload);
3647288f
JG
650 if (ret) {
651 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
a58c490f
JG
652 goto end_unlock;
653 }
654
3647288f 655 /* Update payload length. */
c0a66c84
JG
656 ((struct lttng_notification_channel_message *) payload.buffer.data)->size =
657 (uint32_t) (payload.buffer.size - sizeof(cmd_header));
3647288f 658
882093ee
JR
659 {
660 struct lttng_payload_view pv =
661 lttng_payload_view_from_payload(
662 &payload, 0, -1);
663 const int fd_count =
664 lttng_payload_view_get_fd_handle_count(&pv);
665
666 /* Update fd count. */
667 ((struct lttng_notification_channel_message *) payload.buffer.data)->fds =
668 (uint32_t) fd_count;
669
670 ret = lttcomm_send_unix_sock(
671 socket, pv.buffer.data, pv.buffer.size);
672 if (ret < 0) {
673 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
674 goto end_unlock;
675 }
676
677 /* Pass fds if present. */
678 if (fd_count > 0) {
679 ret = lttcomm_send_payload_view_fds_unix_sock(socket,
680 &pv);
681 if (ret < 0) {
682 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
683 goto end_unlock;
684 }
685 }
a58c490f
JG
686 }
687
688 ret = receive_command_reply(channel, &status);
689 if (ret < 0) {
690 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
691 goto end_unlock;
692 }
693end_unlock:
694 pthread_mutex_unlock(&channel->lock);
695end:
c0a66c84 696 lttng_payload_reset(&payload);
a58c490f
JG
697 return status;
698}
699
700enum lttng_notification_channel_status lttng_notification_channel_subscribe(
701 struct lttng_notification_channel *channel,
702 const struct lttng_condition *condition)
703{
704 return send_condition_command(channel,
705 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
706 condition);
707}
708
709enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
710 struct lttng_notification_channel *channel,
711 const struct lttng_condition *condition)
712{
713 return send_condition_command(channel,
714 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
715 condition);
716}
717
718void lttng_notification_channel_destroy(
719 struct lttng_notification_channel *channel)
720{
721 if (!channel) {
722 return;
723 }
724
725 if (channel->socket >= 0) {
726 (void) lttcomm_close_unix_sock(channel->socket);
727 }
728 pthread_mutex_destroy(&channel->lock);
882093ee 729 lttng_payload_reset(&channel->reception_payload);
a58c490f
JG
730 free(channel);
731}
This page took 0.070906 seconds and 5 git commands to generate.